diff --git a/cmd/gitaly-ssh/auth_test.go b/cmd/gitaly-ssh/auth_test.go index 30512120191cfdacb632e818597ab1b289cb44c7..088005601d98aef329cb0fb831dcf143a7cb38ac 100644 --- a/cmd/gitaly-ssh/auth_test.go +++ b/cmd/gitaly-ssh/auth_test.go @@ -268,6 +268,7 @@ func TestConnectivity(t *testing.T) { backchannel.NewRegistry(), cache.New(cfg, config.NewLocator(cfg), testhelper.SharedLogger(t)), nil, + nil, server.TransactionMiddleware{}, ) t.Cleanup(sf.Stop) diff --git a/internal/cli/gitaly/serve.go b/internal/cli/gitaly/serve.go index 848c0ac61628e819fd673d36c8daf7195f20b3f2..b5ea271c7a7b88630be25191113c998784d36d2e 100644 --- a/internal/cli/gitaly/serve.go +++ b/internal/cli/gitaly/serve.go @@ -46,6 +46,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/gitlab" "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/backchannel" "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client" + housekeepingmw "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/middleware/housekeeping" "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/middleware/limithandler" "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/protoregistry" "gitlab.com/gitlab-org/gitaly/v16/internal/helper" @@ -557,12 +558,16 @@ func run(appCtx *cli.Command, cfg config.Cfg, logger log.Logger) error { housekeepingManager := housekeepingmgr.New(cfg.Prometheus, logger, transactionManager, node) prometheus.MustRegister(housekeepingManager) + housekeepingMiddleware := housekeepingmw.NewHousekeepingMiddleware(logger, protoregistry.GitalyProtoPreregistered, localrepoFactory, housekeepingManager, 20) + defer housekeepingMiddleware.WaitForWorkers() + gitalyServerFactory := server.NewGitalyServerFactory( cfg, logger, registry, diskCache, []*limithandler.LimiterMiddleware{perRPCLimitHandler}, + housekeepingMiddleware, txMiddleware, ) defer gitalyServerFactory.Stop() diff --git a/internal/featureflag/ff_housekeeping_middleware.go b/internal/featureflag/ff_housekeeping_middleware.go new file mode 100644 index 0000000000000000000000000000000000000000..bbbd4771f3ceaf73ba7c1a37e032e2f66009b1c6 --- /dev/null +++ b/internal/featureflag/ff_housekeeping_middleware.go @@ -0,0 +1,9 @@ +package featureflag + +// HousekeepingMiddleware enables the housekeeping scheduler middleware. +var HousekeepingMiddleware = NewFeatureFlag( + "housekeeping_middleware", + "v18.1.0", + "https://gitlab.com/gitlab-org/gitaly/-/issues/6761", + false, +) diff --git a/internal/gitaly/server/auth_test.go b/internal/gitaly/server/auth_test.go index cccced187798fe8386447845354f022dc3076024..a50083334c00e398da1b2923a66388b716f79b8f 100644 --- a/internal/gitaly/server/auth_test.go +++ b/internal/gitaly/server/auth_test.go @@ -248,7 +248,7 @@ func runServer(t *testing.T, cfg config.Cfg) string { limitHandler := limithandler.New(cfg, limithandler.LimitConcurrencyByRepo, setupPerRPCConcurrencyLimiters) updaterWithHooks := updateref.NewUpdaterWithHooks(cfg, logger, locator, hookManager, gitCmdFactory, catfileCache) - srv, err := NewGitalyServerFactory(cfg, logger, registry, diskCache, []*limithandler.LimiterMiddleware{limitHandler}, TransactionMiddleware{}).New(true, false) + srv, err := NewGitalyServerFactory(cfg, logger, registry, diskCache, []*limithandler.LimiterMiddleware{limitHandler}, nil, TransactionMiddleware{}).New(true, false) require.NoError(t, err) localRepoFactory := localrepo.NewFactory(logger, locator, gitCmdFactory, catfileCache) @@ -293,6 +293,7 @@ func runSecureServer(t *testing.T, cfg config.Cfg) string { backchannel.NewRegistry(), cache.New(cfg, config.NewLocator(cfg), testhelper.SharedLogger(t)), []*limithandler.LimiterMiddleware{limithandler.New(cfg, limithandler.LimitConcurrencyByRepo, setupPerRPCConcurrencyLimiters)}, + nil, TransactionMiddleware{}, ).New(true, true) require.NoError(t, err) diff --git a/internal/gitaly/server/server.go b/internal/gitaly/server/server.go index dc9da257d89ae7d9dd838971bce51aa812bad73b..273eb44e4178897fa958e60285a9af0ddcdccd67 100644 --- a/internal/gitaly/server/server.go +++ b/internal/gitaly/server/server.go @@ -137,6 +137,11 @@ func (s *GitalyServerFactory) New(external, secure bool, opts ...Option) (*grpc. unaryServerInterceptors = append(unaryServerInterceptors, limitHandler.UnaryInterceptor()) } + if s.housekeepingMiddleware != nil { + streamServerInterceptors = append(streamServerInterceptors, s.housekeepingMiddleware.StreamServerInterceptor()) + unaryServerInterceptors = append(unaryServerInterceptors, s.housekeepingMiddleware.UnaryServerInterceptor()) + } + streamServerInterceptors = append(streamServerInterceptors, grpctracing.StreamServerTracingInterceptor(), cache.StreamInvalidator(s.cacheInvalidator, protoregistry.GitalyProtoPreregistered, s.logger), diff --git a/internal/gitaly/server/server_factory.go b/internal/gitaly/server/server_factory.go index bb4296a9b935db2be138f7b25e51eb29df1c67d1..ca41c08cd5082c342cde434c220caeb9717fe94b 100644 --- a/internal/gitaly/server/server_factory.go +++ b/internal/gitaly/server/server_factory.go @@ -6,6 +6,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/cache" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config" "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/backchannel" + "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/middleware/housekeeping" "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/middleware/limithandler" "gitlab.com/gitlab-org/gitaly/v16/internal/log" "google.golang.org/grpc" @@ -13,14 +14,15 @@ import ( // GitalyServerFactory is a factory of gitaly grpc servers type GitalyServerFactory struct { - registry *backchannel.Registry - cacheInvalidator cache.Invalidator - limitHandlers []*limithandler.LimiterMiddleware - cfg config.Cfg - logger log.Logger - externalServers []*grpc.Server - internalServers []*grpc.Server - txMiddleware TransactionMiddleware + registry *backchannel.Registry + cacheInvalidator cache.Invalidator + limitHandlers []*limithandler.LimiterMiddleware + cfg config.Cfg + logger log.Logger + externalServers []*grpc.Server + internalServers []*grpc.Server + txMiddleware TransactionMiddleware + housekeepingMiddleware *housekeeping.Middleware } // TransactionMiddleware collects transaction middleware into a single struct that can be @@ -39,15 +41,17 @@ func NewGitalyServerFactory( registry *backchannel.Registry, cacheInvalidator cache.Invalidator, limitHandlers []*limithandler.LimiterMiddleware, + housekeepingMiddleware *housekeeping.Middleware, txMiddleware TransactionMiddleware, ) *GitalyServerFactory { return &GitalyServerFactory{ - cfg: cfg, - logger: logger, - registry: registry, - cacheInvalidator: cacheInvalidator, - limitHandlers: limitHandlers, - txMiddleware: txMiddleware, + cfg: cfg, + logger: logger, + registry: registry, + cacheInvalidator: cacheInvalidator, + limitHandlers: limitHandlers, + txMiddleware: txMiddleware, + housekeepingMiddleware: housekeepingMiddleware, } } diff --git a/internal/gitaly/server/server_factory_test.go b/internal/gitaly/server/server_factory_test.go index 10ad0200e227d265fd51bad7bac93ae31d2cfc0e..a61dd45b389382ea104bbb8378450e922b147f17 100644 --- a/internal/gitaly/server/server_factory_test.go +++ b/internal/gitaly/server/server_factory_test.go @@ -89,6 +89,7 @@ func TestGitalyServerFactory(t *testing.T) { backchannel.NewRegistry(), cache.New(cfg, config.NewLocator(cfg), testhelper.SharedLogger(t)), nil, + nil, TransactionMiddleware{}, ) t.Cleanup(sf.Stop) @@ -110,6 +111,7 @@ func TestGitalyServerFactory(t *testing.T) { backchannel.NewRegistry(), cache.New(cfg, config.NewLocator(cfg), testhelper.SharedLogger(t)), nil, + nil, TransactionMiddleware{}, ) t.Cleanup(sf.Stop) @@ -132,6 +134,7 @@ func TestGitalyServerFactory(t *testing.T) { backchannel.NewRegistry(), cache.New(cfg, config.NewLocator(cfg), testhelper.SharedLogger(t)), nil, + nil, TransactionMiddleware{}, ) t.Cleanup(sf.Stop) @@ -159,6 +162,7 @@ func TestGitalyServerFactory(t *testing.T) { backchannel.NewRegistry(), cache.New(cfg, config.NewLocator(cfg), testhelper.SharedLogger(t)), nil, + nil, TransactionMiddleware{}, ) t.Cleanup(sf.Stop) @@ -191,6 +195,7 @@ func TestGitalyServerFactory(t *testing.T) { backchannel.NewRegistry(), cache.New(cfg, config.NewLocator(cfg), logger), nil, + nil, TransactionMiddleware{}, ) t.Cleanup(sf.Stop) @@ -227,6 +232,7 @@ func TestGitalyServerFactory_closeOrder(t *testing.T) { backchannel.NewRegistry(), cache.New(cfg, config.NewLocator(cfg), testhelper.SharedLogger(t)), nil, + nil, TransactionMiddleware{}, ) defer sf.Stop() diff --git a/internal/grpc/middleware/housekeeping/middleware.go b/internal/grpc/middleware/housekeeping/middleware.go new file mode 100644 index 0000000000000000000000000000000000000000..8b0a8f61cb6860287752cf2b0e3ace2c41f6fe96 --- /dev/null +++ b/internal/grpc/middleware/housekeeping/middleware.go @@ -0,0 +1,262 @@ +package housekeeping + +import ( + "context" + "sync" + + "gitlab.com/gitlab-org/gitaly/v16/internal/featureflag" + "gitlab.com/gitlab-org/gitaly/v16/internal/git/housekeeping" + "gitlab.com/gitlab-org/gitaly/v16/internal/git/housekeeping/manager" + "gitlab.com/gitlab-org/gitaly/v16/internal/git/localrepo" + "gitlab.com/gitlab-org/gitaly/v16/internal/git/stats" + "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/protoregistry" + "gitlab.com/gitlab-org/gitaly/v16/internal/log" + "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" + "gitlab.com/gitlab-org/gitaly/v16/middleware" + "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" + "google.golang.org/grpc" + "google.golang.org/protobuf/proto" +) + +// activity tracks housekeeping activity for a specific relative path. +type activity struct { + writeCount int + active bool +} + +type repoKey struct { + storage, relativePath string +} + +// Middleware manages scheduling of housekeeping tasks by intercepting gRPC requests. +type Middleware struct { + interval int + repoActivity map[repoKey]*activity + + mu sync.Mutex + wg sync.WaitGroup + + logger log.Logger + registry *protoregistry.Registry + manager manager.Manager + localRepoFactory localrepo.Factory +} + +// NewHousekeepingMiddleware returns a new middleware. +func NewHousekeepingMiddleware(logger log.Logger, registry *protoregistry.Registry, factory localrepo.Factory, manager manager.Manager, interval int) *Middleware { + return &Middleware{ + interval: interval, + logger: logger, + registry: registry, + localRepoFactory: factory, + manager: manager, + repoActivity: make(map[repoKey]*activity), + } +} + +// WaitForWorkers waits for any active housekeeping tasks to finish. +func (m *Middleware) WaitForWorkers() { + m.wg.Wait() +} + +// UnaryServerInterceptor returns gRPC unary middleware. +func (m *Middleware) UnaryServerInterceptor() grpc.UnaryServerInterceptor { + return func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { + if featureflag.HousekeepingMiddleware.IsDisabled(ctx) { + return handler(ctx, req) + } + + methodInfo, err := m.registry.LookupMethod(info.FullMethod) + if err != nil { + m.logger.WithError(err).ErrorContext(ctx, "lookup method for housekeeping") + return handler(ctx, req) + } + + targetRepo, err := methodInfo.TargetRepo(req.(proto.Message)) + if err != nil { + m.logger.WithError(err).ErrorContext(ctx, "lookup target repository for housekeeping") + return handler(ctx, req) + } + + key := m.getRepoKey(targetRepo) + + if methodInfo.Operation == protoregistry.OpMaintenance { + m.mu.Lock() + + if m.isActive(key) { + m.mu.Unlock() + return nil, structerr.NewAlreadyExists("housekeeping already executing for repository") + } + m.markHousekeepingActive(key) + + m.mu.Unlock() + + resp, err := handler(ctx, req) + + m.markHousekeepingInactive(key) + + return resp, err + } + + if methodInfo.Operation == protoregistry.OpMutator { + // Execute the handler first so that housekeeping incorporates the latest writes. We also ensure that + // the scheduling logic doesn't run for invalid requests. + resp, err := handler(ctx, req) + if err != nil { + return resp, err + } + + m.scheduleHousekeeping(ctx, targetRepo) + return resp, err + } + + return handler(ctx, req) + } +} + +// StreamServerInterceptor returns gRPC stream request middleware. +func (m *Middleware) StreamServerInterceptor() grpc.StreamServerInterceptor { + return func(srv any, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { + if featureflag.HousekeepingMiddleware.IsDisabled(ss.Context()) { + return handler(srv, ss) + } + + methodInfo, err := m.registry.LookupMethod(info.FullMethod) + if err != nil { + m.logger.WithError(err).ErrorContext(ss.Context(), "lookup method for housekeeping") + return handler(srv, ss) + } + + req := methodInfo.NewRequest() + if err := ss.RecvMsg(req); err != nil { + m.logger.WithError(err).ErrorContext(ss.Context(), "lookup target repository for housekeeping") + return handler(srv, middleware.NewPeekedStream(ss.Context(), nil, err, ss)) + } + + targetRepo, err := methodInfo.TargetRepo(req) + if err != nil { + return handler(srv, middleware.NewPeekedStream(ss.Context(), req, nil, ss)) + } + + key := m.getRepoKey(targetRepo) + + if methodInfo.Operation == protoregistry.OpMaintenance { + m.mu.Lock() + if m.isActive(key) { + m.mu.Unlock() + return structerr.NewAlreadyExists("housekeeping already executing for repository") + } + + m.markHousekeepingActive(key) + m.mu.Unlock() + + // Ensure that the first message we consumed earlier is relayed to the client. + err = handler(srv, middleware.NewPeekedStream(ss.Context(), req, nil, ss)) + + m.markHousekeepingInactive(key) + + return err + } + + if methodInfo.Operation == protoregistry.OpMutator { + // Execute the handler first so that housekeeping incorporates the latest writes. We also ensure that + // the scheduling logic doesn't run for invalid requests. + if err := handler(srv, middleware.NewPeekedStream(ss.Context(), req, nil, ss)); err != nil { + return err + } + + m.scheduleHousekeeping(ss.Context(), targetRepo) + return nil + } + + // Ensure that the first message we consumed earlier is relayed to the client. + return handler(srv, middleware.NewPeekedStream(ss.Context(), req, nil, ss)) + } +} + +func (m *Middleware) markHousekeepingActive(key repoKey) { + a, ok := m.repoActivity[key] + if !ok { + a = &activity{} + m.repoActivity[key] = a + } + + a.active = true + // Reset the counter at the start so we can track the number of write RPCs that executed while a housekeeping + // job is active. + a.writeCount = 0 +} + +func (m *Middleware) markHousekeepingInactive(key repoKey) { + m.mu.Lock() + defer m.mu.Unlock() + + a, ok := m.repoActivity[key] + if !ok { + a = &activity{} + m.repoActivity[key] = a + } + + // Since we reset the counter at the beginning of housekeeping, if the counter remains at 0 after housekeeping + // it means the repository is low-activity. We can remove the entry in the map if so. + if a.writeCount == 0 { + delete(m.repoActivity, key) + return + } + + a.active = false +} + +func (m *Middleware) isActive(key repoKey) bool { + a, ok := m.repoActivity[key] + if !ok { + return false + } + + return a.active +} + +func (m *Middleware) scheduleHousekeeping(ctx context.Context, repo *gitalypb.Repository) { + m.mu.Lock() + defer m.mu.Unlock() + + key := m.getRepoKey(repo) + + a, ok := m.repoActivity[key] + if !ok { + a = &activity{} + m.repoActivity[key] = a + } + a.writeCount++ + + if a.writeCount <= m.interval || a.active { + return + } + + m.logger.InfoContext(ctx, "beginning scheduled housekeeping") + + m.markHousekeepingActive(key) + + m.wg.Add(1) + go func() { + defer func() { + m.markHousekeepingInactive(key) + + m.logger.InfoContext(ctx, "ended scheduled housekeeping") + m.wg.Done() + }() + + localRepo := m.localRepoFactory.Build(repo) + if err := m.manager.OptimizeRepository(ctx, localRepo, manager.WithOptimizationStrategyConstructor( + func(info stats.RepositoryInfo) housekeeping.OptimizationStrategy { + return housekeeping.NewHeuristicalOptimizationStrategy(info) + }, + )); err != nil { + m.logger.ErrorContext(ctx, "failed scheduled housekeeping") + } + }() +} + +func (m *Middleware) getRepoKey(repo *gitalypb.Repository) repoKey { + return repoKey{storage: repo.GetStorageName(), relativePath: repo.GetRelativePath()} +} diff --git a/internal/grpc/middleware/housekeeping/middleware_test.go b/internal/grpc/middleware/housekeeping/middleware_test.go new file mode 100644 index 0000000000000000000000000000000000000000..54b02d7d557e9d75b94f9628b710f7662d686454 --- /dev/null +++ b/internal/grpc/middleware/housekeeping/middleware_test.go @@ -0,0 +1,395 @@ +package housekeeping + +import ( + "context" + "fmt" + "net" + "sync" + "testing" + + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/v16/internal/featureflag" + "gitlab.com/gitlab-org/gitaly/v16/internal/git/catfile" + "gitlab.com/gitlab-org/gitaly/v16/internal/git/gittest" + "gitlab.com/gitlab-org/gitaly/v16/internal/git/housekeeping" + "gitlab.com/gitlab-org/gitaly/v16/internal/git/housekeeping/config" + housekeepingmgr "gitlab.com/gitlab-org/gitaly/v16/internal/git/housekeeping/manager" + "gitlab.com/gitlab-org/gitaly/v16/internal/git/localrepo" + gitalycfg "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" + "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/protoregistry" + "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" + "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper/testcfg" + "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" +) + +type testService struct { + gitalypb.UnimplementedRepositoryServiceServer +} + +// Mutator, Unary +func (ts *testService) WriteRef(context.Context, *gitalypb.WriteRefRequest) (*gitalypb.WriteRefResponse, error) { + return &gitalypb.WriteRefResponse{}, nil +} + +// Mutator, Unary, Erroring +func (ts *testService) CreateRepositoryFromBundle(grpc.ClientStreamingServer[gitalypb.CreateRepositoryFromBundleRequest, gitalypb.CreateRepositoryFromBundleResponse]) error { + return fmt.Errorf("designed to error") +} + +// Accessor, Unary +func (ts *testService) RepositoryExists(context.Context, *gitalypb.RepositoryExistsRequest) (*gitalypb.RepositoryExistsResponse, error) { + return &gitalypb.RepositoryExistsResponse{}, nil +} + +// Accessor, Stream +func (ts *testService) GetArchive(*gitalypb.GetArchiveRequest, grpc.ServerStreamingServer[gitalypb.GetArchiveResponse]) error { + return nil +} + +// Accessor, Unary, Erroring +func (ts *testService) RepositoryInfo(context.Context, *gitalypb.RepositoryInfoRequest) (*gitalypb.RepositoryInfoResponse, error) { + return &gitalypb.RepositoryInfoResponse{}, fmt.Errorf("designed to error") +} + +// Maintenance, Unary +func (ts *testService) OptimizeRepository(context.Context, *gitalypb.OptimizeRepositoryRequest) (*gitalypb.OptimizeRepositoryResponse, error) { + return nil, nil +} + +// Maintenance, Unary +func (ts *testService) PruneUnreachableObjects(context.Context, *gitalypb.PruneUnreachableObjectsRequest) (*gitalypb.PruneUnreachableObjectsResponse, error) { + return nil, nil +} + +type mockHousekeepingManager struct { + optimizeRepositoryInvocations map[string]int // RelativePath -> count + mu sync.Mutex + + useDelayCh bool + delayCh chan struct{} +} + +func (m *mockHousekeepingManager) getOptimizeRepositoryInvocations(relativePath string) int { + m.mu.Lock() + defer m.mu.Unlock() + + return m.optimizeRepositoryInvocations[relativePath] +} + +func (m *mockHousekeepingManager) withDelay() chan struct{} { + m.mu.Lock() + defer m.mu.Unlock() + + m.delayCh = make(chan struct{}) + m.useDelayCh = true + + return m.delayCh +} + +func (m *mockHousekeepingManager) withoutDelay() { + m.mu.Lock() + defer m.mu.Unlock() + + m.useDelayCh = false +} + +func (m *mockHousekeepingManager) CleanStaleData(context.Context, *localrepo.Repo, housekeeping.CleanStaleDataConfig) error { + return nil +} + +func (m *mockHousekeepingManager) OptimizeRepository(ctx context.Context, repo *localrepo.Repo, opts ...housekeepingmgr.OptimizeRepositoryOption) error { + m.mu.Lock() + defer m.mu.Unlock() + + relativePath := repo.GetRelativePath() + if _, ok := m.optimizeRepositoryInvocations[relativePath]; !ok { + m.optimizeRepositoryInvocations[relativePath] = 0 + } + + m.optimizeRepositoryInvocations[relativePath]++ + + if m.useDelayCh { + <-m.delayCh + } + + return nil +} + +func (m *mockHousekeepingManager) AddPackRefsInhibitor(ctx context.Context, repo storage.Repository) (bool, func(), error) { + return false, nil, nil +} + +func (m *mockHousekeepingManager) OffloadRepository(context.Context, *localrepo.Repo, config.OffloadingConfig) error { + return nil +} + +func TestInterceptors(t *testing.T) { + testhelper.NewFeatureSets( + featureflag.HousekeepingMiddleware, + ).Run(t, testInterceptors) +} + +func testInterceptors(t *testing.T, ctx context.Context) { + cfg := testcfg.Build(t) + logger := testhelper.NewLogger(t) + catfileCache := catfile.NewCache(cfg) + t.Cleanup(catfileCache.Stop) + + localRepoFactory := localrepo.NewFactory(logger, gitalycfg.NewLocator(cfg), gittest.NewCommandFactory(t, cfg), catfileCache) + + housekeepingManager := &mockHousekeepingManager{ + optimizeRepositoryInvocations: make(map[string]int), + delayCh: make(chan struct{}), + } + + housekeepingMiddleware := NewHousekeepingMiddleware(logger, protoregistry.GitalyProtoPreregistered, localRepoFactory, housekeepingManager, 1) + defer housekeepingMiddleware.WaitForWorkers() + + server := grpc.NewServer( + grpc.StreamInterceptor(housekeepingMiddleware.StreamServerInterceptor()), + grpc.UnaryInterceptor(housekeepingMiddleware.UnaryServerInterceptor()), + ) + t.Cleanup(server.Stop) + + service := &testService{} + + gitalypb.RegisterRepositoryServiceServer(server, service) + + listener, err := net.Listen("tcp", ":0") + require.NoError(t, err) + go func() { + testhelper.MustServe(t, server, listener) + }() + + conn, err := grpc.NewClient( + listener.Addr().String(), + grpc.WithTransportCredentials(insecure.NewCredentials()), + ) + require.NoError(t, err) + defer testhelper.MustClose(t, conn) + + t.Run("when unary mutator RPCs are intercepted", func(t *testing.T) { + repo := &gitalypb.Repository{ + RelativePath: "myrepo1", + } + + sendFn := func() { + _, err = gitalypb.NewRepositoryServiceClient(conn).WriteRef(ctx, &gitalypb.WriteRefRequest{ + Repository: repo, + }) + require.NoError(t, err) + } + + sendFn() + + housekeepingMiddleware.WaitForWorkers() + require.Equal(t, 0, housekeepingManager.getOptimizeRepositoryInvocations(repo.GetRelativePath()), "no invocations under the interval") + + sendFn() + + housekeepingMiddleware.WaitForWorkers() + require.Equal(t, testhelper.EnabledOrDisabledFlag(ctx, featureflag.HousekeepingMiddleware, 1, 0), housekeepingManager.getOptimizeRepositoryInvocations(repo.GetRelativePath()), "one invocation after the interval") + + for range 2 { + sendFn() + } + + housekeepingMiddleware.WaitForWorkers() + require.Equal(t, testhelper.EnabledOrDisabledFlag(ctx, featureflag.HousekeepingMiddleware, 2, 0), housekeepingManager.getOptimizeRepositoryInvocations(repo.GetRelativePath()), "another invocation after the interval") + }) + + t.Run("when unary accessor RPCs are intercepted", func(t *testing.T) { + repo := &gitalypb.Repository{ + RelativePath: "myrepo2", + } + + sendFn := func() { + _, err = gitalypb.NewRepositoryServiceClient(conn).RepositoryExists(ctx, &gitalypb.RepositoryExistsRequest{ + Repository: repo, + }) + require.NoError(t, err) + } + + sendFn() + + housekeepingMiddleware.WaitForWorkers() + require.Equal(t, 0, housekeepingManager.getOptimizeRepositoryInvocations(repo.GetRelativePath()), "no invocations under the interval") + + sendFn() + + housekeepingMiddleware.WaitForWorkers() + require.Equal(t, 0, housekeepingManager.getOptimizeRepositoryInvocations(repo.GetRelativePath()), "no invocations after the interval") + }) + + t.Run("when stream accessor RPCs are intercepted", func(t *testing.T) { + repo := &gitalypb.Repository{ + RelativePath: "myrepo3", + } + + sendFn := func() { + stream, err := gitalypb.NewRepositoryServiceClient(conn).GetArchive(ctx, &gitalypb.GetArchiveRequest{ + Repository: repo, + }) + require.NoError(t, err) + require.NoError(t, stream.CloseSend()) + } + + sendFn() + + housekeepingMiddleware.WaitForWorkers() + require.Equal(t, 0, housekeepingManager.getOptimizeRepositoryInvocations(repo.GetRelativePath()), "no invocations under the interval") + + sendFn() + + housekeepingMiddleware.WaitForWorkers() + require.Equal(t, 0, housekeepingManager.getOptimizeRepositoryInvocations(repo.GetRelativePath()), "no invocations after the interval") + }) + + t.Run("when an erroring RPC is intercepted", func(t *testing.T) { + repo := &gitalypb.Repository{ + RelativePath: "myrepo4", + } + + for range 2 { + _, err = gitalypb.NewRepositoryServiceClient(conn).RepositoryInfo(ctx, &gitalypb.RepositoryInfoRequest{ + Repository: repo, + }) + require.EqualError(t, err, "rpc error: code = Unknown desc = designed to error", "middleware preserves the original error") + } + + housekeepingMiddleware.WaitForWorkers() + require.Equal(t, 0, housekeepingManager.getOptimizeRepositoryInvocations(repo.GetRelativePath()), "no invocations after the interval") + + for range 2 { + stream, err := gitalypb.NewRepositoryServiceClient(conn).CreateRepositoryFromBundle(ctx) + require.NoError(t, err) + require.NoError(t, stream.Send(&gitalypb.CreateRepositoryFromBundleRequest{ + Repository: repo, + })) + + _, err = stream.CloseAndRecv() + require.EqualError(t, err, "rpc error: code = Unknown desc = designed to error", "middleware preserves the original error") + } + + housekeepingMiddleware.WaitForWorkers() + require.Equal(t, 0, housekeepingManager.getOptimizeRepositoryInvocations(repo.GetRelativePath()), "no invocations after the interval") + }) + + t.Run("when the OptimizeRepository RPC is invoked", func(t *testing.T) { + repo := &gitalypb.Repository{ + RelativePath: "myrepo5", + } + + for range 2 { + _, err = gitalypb.NewRepositoryServiceClient(conn).OptimizeRepository(ctx, &gitalypb.OptimizeRepositoryRequest{ + Repository: repo, + }) + require.NoError(t, err) + } + + housekeepingMiddleware.WaitForWorkers() + require.Equal(t, 0, housekeepingManager.getOptimizeRepositoryInvocations(repo.GetRelativePath()), "does not schedule further housekeeping") + }) + + t.Run("when the PruneUnreachableObjects RPC is invoked", func(t *testing.T) { + repo := &gitalypb.Repository{ + RelativePath: "myrepo6", + } + + for range 2 { + _, err = gitalypb.NewRepositoryServiceClient(conn).PruneUnreachableObjects(ctx, &gitalypb.PruneUnreachableObjectsRequest{ + Repository: repo, + }) + require.NoError(t, err) + } + + housekeepingMiddleware.WaitForWorkers() + require.Equal(t, 0, housekeepingManager.getOptimizeRepositoryInvocations(repo.GetRelativePath()), "does not schedule further housekeeping") + }) + + t.Run("when a housekeeping task is active when a maintenance RPC is received", func(t *testing.T) { + repo := &gitalypb.Repository{ + RelativePath: "myrepo7", + } + + ch := housekeepingManager.withDelay() + defer housekeepingManager.withoutDelay() + + for range 2 { + _, err = gitalypb.NewRepositoryServiceClient(conn).WriteRef(ctx, &gitalypb.WriteRefRequest{ + Repository: repo, + }) + require.NoError(t, err) + } + + _, err = gitalypb.NewRepositoryServiceClient(conn).OptimizeRepository(ctx, &gitalypb.OptimizeRepositoryRequest{ + Repository: repo, + }) + + if featureflag.HousekeepingMiddleware.IsEnabled(ctx) { + require.EqualError(t, err, "rpc error: code = AlreadyExists desc = housekeeping already executing for repository") + } else { + require.NoError(t, err) + } + + close(ch) + }) + + t.Run("when a maintenance RPC is active and the write interval is reached", func(t *testing.T) { + repo := &gitalypb.Repository{ + RelativePath: "myrepo8", + } + + ch := housekeepingManager.withDelay() + defer housekeepingManager.withoutDelay() + + _, err = gitalypb.NewRepositoryServiceClient(conn).OptimizeRepository(ctx, &gitalypb.OptimizeRepositoryRequest{ + Repository: repo, + }) + + for range 2 { + _, err = gitalypb.NewRepositoryServiceClient(conn).WriteRef(ctx, &gitalypb.WriteRefRequest{ + Repository: repo, + }) + require.NoError(t, err) + } + + close(ch) + + require.Equal(t, testhelper.EnabledOrDisabledFlag(ctx, featureflag.HousekeepingMiddleware, 1, 0), housekeepingManager.getOptimizeRepositoryInvocations(repo.GetRelativePath()), "no invocations under the interval") + }) + + t.Run("when the write interval is reached again when housekeeping is active", func(t *testing.T) { + repo := &gitalypb.Repository{ + RelativePath: "myrepo9", + } + + sendFn := func() { + _, err = gitalypb.NewRepositoryServiceClient(conn).WriteRef(ctx, &gitalypb.WriteRefRequest{ + Repository: repo, + }) + require.NoError(t, err) + } + + ch := housekeepingManager.withDelay() + defer housekeepingManager.withoutDelay() + + // The first two requests will trigger housekeeping that runs until ch is closed. + // The next two requests won't trigger housekeeping as there's already an active job. + for range 4 { + sendFn() + } + + // Release the active housekeeping job. + close(ch) + + // The next request triggers housekeeping as the counter has already incremented past the interval. + sendFn() + + housekeepingMiddleware.WaitForWorkers() + require.Equal(t, testhelper.EnabledOrDisabledFlag(ctx, featureflag.HousekeepingMiddleware, 2, 0), housekeepingManager.getOptimizeRepositoryInvocations(repo.GetRelativePath()), "another invocation after the interval") + }) +} diff --git a/internal/grpc/middleware/housekeeping/testhelper_test.go b/internal/grpc/middleware/housekeeping/testhelper_test.go new file mode 100644 index 0000000000000000000000000000000000000000..e41506673ee33550066a3ce896c3cf8b92441e8e --- /dev/null +++ b/internal/grpc/middleware/housekeeping/testhelper_test.go @@ -0,0 +1,11 @@ +package housekeeping + +import ( + "testing" + + "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" +) + +func TestMain(m *testing.M) { + testhelper.Run(m) +} diff --git a/internal/testhelper/testserver/gitaly.go b/internal/testhelper/testserver/gitaly.go index 98e8cd126e5c128d05d39242f03035a552d75c79..dffbc22ed704d1a2a0851d4dd13860f0ae7c65a5 100644 --- a/internal/testhelper/testserver/gitaly.go +++ b/internal/testhelper/testserver/gitaly.go @@ -216,6 +216,7 @@ func runGitaly(tb testing.TB, cfg config.Cfg, registrar func(srv *grpc.Server, d deps.GetBackchannelRegistry(), deps.GetDiskCache(), []*limithandler.LimiterMiddleware{deps.GetLimitHandler()}, + nil, txMiddleware, )