diff --git a/internal/gitaly/client/dial.go b/internal/gitaly/client/dial.go index def5b5fad75ce8d85a689ed41156ec4c61b51892..bc8cf0bca3e5f63a6baf7d80b28f14746fa11f9f 100644 --- a/internal/gitaly/client/dial.go +++ b/internal/gitaly/client/dial.go @@ -205,6 +205,7 @@ func defaultServiceConfig() string { LoadBalancingConfig: []*gitalypb.LoadBalancingConfig{{ Policy: &gitalypb.LoadBalancingConfig_RoundRobin{}, }}, + MethodConfig: defaultMethodConfigs(), } configJSON, err := protojson.Marshal(serviceConfig) if err != nil { @@ -213,3 +214,12 @@ func defaultServiceConfig() string { return string(configJSON) } + +func defaultMethodConfigs() []*gitalypb.MethodConfig { + var configs []*gitalypb.MethodConfig + + // Add the list of method configs for RPCc pushback + configs = append(configs, limithandler.DefaultPushbackMethodConfigs()...) + + return configs +} diff --git a/internal/middleware/limithandler/middleware.go b/internal/middleware/limithandler/middleware.go index c532943a47bcd15bf9963103421402c5b23ae377..632bfb1e9f360d346dea837d3881a54d2f9a26d3 100644 --- a/internal/middleware/limithandler/middleware.go +++ b/internal/middleware/limithandler/middleware.go @@ -42,6 +42,7 @@ type LimiterMiddleware struct { getLockKey GetLockKey requestsDroppedMetric *prometheus.CounterVec collect func(metrics chan<- prometheus.Metric) + pushback *pushback } // New creates a new middleware that limits requests. SetupFunc sets up the @@ -61,6 +62,7 @@ func New(cfg config.Cfg, getLockKey GetLockKey, setupMiddleware SetupFunc) *Limi "reason", }, ), + pushback: &pushback{policies: defaultPushbackPolicies()}, } setupMiddleware(cfg, middleware) @@ -95,9 +97,13 @@ func (c *LimiterMiddleware) UnaryInterceptor() grpc.UnaryServerInterceptor { return handler(ctx, req) } - return limiter.Limit(ctx, lockKey, func() (interface{}, error) { + response, err := limiter.Limit(ctx, lockKey, func() (interface{}, error) { return handler(ctx, req) }) + if err != nil { + c.pushback.push(ctx, info.FullMethod, err) + } + return response, err } } @@ -163,6 +169,9 @@ func (w *wrappedStream) RecvMsg(m interface{}) error { // It's our turn! return nil case err := <-errs: + if err != nil { + w.limiterMiddleware.pushback.push(ctx, w.info.FullMethod, err) + } return err } } diff --git a/internal/middleware/limithandler/pushback.go b/internal/middleware/limithandler/pushback.go new file mode 100644 index 0000000000000000000000000000000000000000..d9726a178fb6b23af616d19b5df8b6b9f5983af9 --- /dev/null +++ b/internal/middleware/limithandler/pushback.go @@ -0,0 +1,181 @@ +package limithandler + +import ( + "context" + "errors" + "fmt" + "math/rand" + "strconv" + "time" + + "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus" + "gitlab.com/gitlab-org/gitaly/v15/internal/backoff" + "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/protoregistry" + "gitlab.com/gitlab-org/gitaly/v15/internal/structerr" + "gitlab.com/gitlab-org/gitaly/v15/proto/go/gitalypb" + "google.golang.org/grpc" + "google.golang.org/grpc/metadata" + "google.golang.org/protobuf/types/known/durationpb" +) + +const ( + // GrpcPushbackHeader is the key for gRPC response header that defines the duration in + // milliseconds a client should back-off before re-send the request again. This header is + // only effective if there is a retry policy configuration for the RPC. The retry policy + // must declare MaxAttempts and RetryableStatusCodes. This pushback duration has higher + // precedence than retry settings in the retry policy. + // + // When a client receives this header, it is forced to sleep. As this feature is supported + // in gRPC library implementations, clients can't refuse. It's recommended to add this + // header to protect most critical, resource-hungry RPCs, such as UploadPack, PackObject. + // Please be nice! + // + // For more information, visit https://github.com/grpc/proposal/blob/master/A6-client-retries.md#pushback + GrpcPushbackHeader = "grpc-retry-pushback-ms" + // GrpcPreviousAttempts defines the key of gRPC request header that stores the amount of + // previous attempts of the same RPC. This number counts "transparent" failures and failures + // matching RetryableStatusCodes retry config. This counter stays in the library layer. If + // the application layer performs the retry, this value is always set to 0. + GrpcPreviousAttempts = "grpc-previous-rpc-attempts" +) + +// grpcPushbackMaxAttempt defines the maximum attempt a client should retry. Other type of +// transient failures, such as network failures, are also taken into account. +var grpcPushbackMaxAttempt = 3 + +// grpcPushbackRetryableStatusCodes defines the list of gRPC codes a client should perform retry +// automatically. The status codes are capitalized SNAKE_CASE. The following link contains the list +// of all codes: https://grpc.github.io/grpc/core/md_doc_statuscodes.html +var grpcPushbackRetryableStatusCodes = []string{"RESOURCE_EXHAUSTED"} + +// Exponential backoff parameters +var ( + initialBackoff = 5 * time.Second + maxBackoff = 60 * time.Second + backoffMultiplier = 2.0 +) + +// RPCsWithPushbackHeaders defines the list of Gitaly RPCs to add pushback support. Please note +// that the list must only include external RPCs from clients. Adding an internal RPC, such as +// /gitaly.HookService/PackObjectsHookWithSidechannel, makes Gitaly pushes itself. +var RPCsWithPushbackHeaders = []string{ + "/gitaly.SSHService/SSHUploadPackWithSidechannel", + "/gitaly.SmartHTTPService/PostUploadPackWithSidechannel", +} + +func init() { + for _, rpc := range RPCsWithPushbackHeaders { + if _, err := protoregistry.GitalyProtoPreregistered.LookupMethod(rpc); err != nil { + panic(fmt.Errorf("RPC not found: %s", rpc)) + } + } +} + +// DefaultPushbackMethodConfigs returns the list of gRPC method configs. Each method config includes +// name of RPC with pushback enabled, maximum attempts, and retryable status codes. This list should +// be added to client Service Config when it dials. +func DefaultPushbackMethodConfigs() []*gitalypb.MethodConfig { + var configs []*gitalypb.MethodConfig + + for _, rpc := range RPCsWithPushbackHeaders { + // Method is validated when this package is loaded + mth, _ := protoregistry.GitalyProtoPreregistered.LookupMethod(rpc) + + serviceName, methodName := mth.ServiceNameAndMethodName() + configs = append(configs, &gitalypb.MethodConfig{ + Name: []*gitalypb.MethodConfig_Name{{ + Service: serviceName, + Method: methodName, + }}, + // When specify pushback header grpc-retry-pushback-ms, client uses that + // value. Other exponential backoff parameters in RetryPolicy are ignored. + // We supply them here to make valid + RetryPolicy: &gitalypb.MethodConfig_RetryPolicy{ + MaxAttempts: uint32(grpcPushbackMaxAttempt), + RetryableStatusCodes: grpcPushbackRetryableStatusCodes, + InitialBackoff: durationpb.New(initialBackoff), + MaxBackoff: durationpb.New(maxBackoff), + BackoffMultiplier: float32(backoffMultiplier), + }, + }) + } + return configs +} + +// newLimitErrorBackoff returns an exponential backoff strategy when facing limit error. The backoff +// parameters are adjusted much longer and steeper than normal networking failure. When Gitaly node +// is saturated and starts to push-back traffic, it takes a lot of time for the situation to go +// away. The node can either wait for more room to breath or terminate in-flight requests. Either +// way, it does not make sense for clients to retry in short delays. +// +// | Retries | Delay before random jitter | +// | ------- | -------------------------- | +// | 0 | 5 second | +// | 1 | 10 seconds | +// | 2 | 20 seconds | +// | 3 | 40 seconds | +func newLimitErrorBackoff() backoff.Strategy { + exponential := backoff.NewDefaultExponential(rand.New(rand.NewSource(time.Now().UnixNano()))) + exponential.BaseDelay = initialBackoff + exponential.MaxDelay = maxBackoff + exponential.Multiplier = backoffMultiplier + return exponential +} + +func defaultPushbackPolicies() map[string]backoff.Strategy { + policies := map[string]backoff.Strategy{} + for _, rpc := range RPCsWithPushbackHeaders { + policies[rpc] = newLimitErrorBackoff() + } + return policies +} + +type pushback struct { + policies map[string]backoff.Strategy +} + +func (p *pushback) push(ctx context.Context, fullMethod string, err error) { + if !errors.Is(err, ErrMaxQueueSize) && !errors.Is(err, ErrMaxQueueTime) { + return + } + var strategy backoff.Strategy + strategy, exist := p.policies[fullMethod] + if !exist { + return + } + + var attempts uint + if strAttempts := metadata.ValueFromIncomingContext(ctx, GrpcPreviousAttempts); len(strAttempts) > 0 { + parsedAttempts, err := strconv.ParseInt(strAttempts[0], 10, 32) + if err != nil { + ctxlogrus.Extract(ctx).WithError(err).Error("fail to parse gRPC previous retry attempts") + return + } + attempts = uint(parsedAttempts) + } + + pushbackDuration := strategy.Backoff(attempts) + p.setResponseHeader(ctx, pushbackDuration) + p.setErrorDetail(err, pushbackDuration) +} + +func (p *pushback) setResponseHeader(ctx context.Context, pushbackDuration time.Duration) { + if err := grpc.SetTrailer(ctx, metadata.MD{GrpcPushbackHeader: []string{fmt.Sprintf("%d", pushbackDuration.Milliseconds())}}); err != nil { + ctxlogrus.Extract(ctx).WithError(err).Error("fail to set gRPC push-back header") + } +} + +func (p *pushback) setErrorDetail(err error, pushbackDuration time.Duration) { + var structErr structerr.Error + if errors.As(err, &structErr) { + for _, detail := range structErr.Details() { + if limitError, ok := detail.(*gitalypb.LimitError); ok { + // The underlying layers can specify its own RetryAfter value. The + // middleware should honor that decision. + if limitError.RetryAfter.AsDuration() == time.Duration(0) { + limitError.RetryAfter = durationpb.New(pushbackDuration) + } + } + } + } +} diff --git a/internal/praefect/protoregistry/protoregistry.go b/internal/praefect/protoregistry/protoregistry.go index 5cb15bbc5f053fcf0e7225b8c8a817981a0baaed..b12e4f46dddf29cfb521fe45a784a55fe4113709 100644 --- a/internal/praefect/protoregistry/protoregistry.go +++ b/internal/praefect/protoregistry/protoregistry.go @@ -22,6 +22,148 @@ func init() { } } +// OpType represents the operation type for a RPC method +type OpType int + +const ( + // OpUnknown = unknown operation type + OpUnknown OpType = iota + // OpAccessor = accessor operation type (ready only) + OpAccessor + // OpMutator = mutator operation type (modifies a repository) + OpMutator + // OpMaintenance is an operation which performs maintenance-tasks on the repository. It + // shouldn't ever result in a user-visible change in behaviour, except that it may repair + // corrupt data. + OpMaintenance +) + +// Scope represents the intended scope of an RPC method +type Scope int + +const ( + // ScopeUnknown is the default scope until determined otherwise + ScopeUnknown Scope = iota + // ScopeRepository indicates an RPC's scope is limited to a repository + ScopeRepository + // ScopeStorage indicates an RPC is scoped to an entire storage location + ScopeStorage +) + +func (s Scope) String() string { + switch s { + case ScopeStorage: + return "storage" + case ScopeRepository: + return "repository" + default: + return fmt.Sprintf("N/A: %d", s) + } +} + +var protoScope = map[gitalypb.OperationMsg_Scope]Scope{ + gitalypb.OperationMsg_REPOSITORY: ScopeRepository, + gitalypb.OperationMsg_STORAGE: ScopeStorage, +} + +// MethodInfo contains metadata about the RPC method. Refer to documentation +// for message type "OperationMsg" shared.proto in ./proto for +// more documentation. +type MethodInfo struct { + Operation OpType + Scope Scope + targetRepo []int + additionalRepo []int + requestName string // protobuf message name for input type + requestFactory protoFactory + storage []int + fullMethodName string +} + +// TargetRepo returns the target repository for a protobuf message if it exists +func (mi MethodInfo) TargetRepo(msg proto.Message) (*gitalypb.Repository, error) { + return mi.getRepo(msg, mi.targetRepo) +} + +// AdditionalRepo returns the additional repository for a protobuf message that needs a storage rewritten +// if it exists +func (mi MethodInfo) AdditionalRepo(msg proto.Message) (*gitalypb.Repository, bool, error) { + if mi.additionalRepo == nil { + return nil, false, nil + } + + repo, err := mi.getRepo(msg, mi.additionalRepo) + + return repo, true, err +} + +//nolint:revive // This is unintentionally missing documentation. +func (mi MethodInfo) FullMethodName() string { + return mi.fullMethodName +} + +// ServiceNameAndMethodName returns a tuple of service name and method name. The service name +// includes its package name. +func (mi MethodInfo) ServiceNameAndMethodName() (string, string) { + parts := strings.SplitN(strings.TrimPrefix(mi.fullMethodName, "/"), "/", 2) + if len(parts) < 2 { + return "", "" + } + return parts[0], parts[1] +} + +func (mi MethodInfo) getRepo(msg proto.Message, targetOid []int) (*gitalypb.Repository, error) { + if mi.requestName != string(proto.MessageName(msg)) { + return nil, fmt.Errorf( + "proto message %s does not match expected RPC request message %s", + proto.MessageName(msg), mi.requestName, + ) + } + + repo, err := reflectFindRepoTarget(msg, targetOid) + switch { + case err != nil: + return nil, err + case repo == nil: + // it is possible for the target repo to not be set (especially in our unit + // tests designed to fail and this should return an error to prevent nil + // pointer dereferencing + return nil, ErrTargetRepoMissing + default: + return repo, nil + } +} + +// Storage returns the storage name for a protobuf message if it exists +func (mi MethodInfo) Storage(msg proto.Message) (string, error) { + if mi.requestName != string(proto.MessageName(msg)) { + return "", fmt.Errorf( + "proto message %s does not match expected RPC request message %s", + proto.MessageName(msg), mi.requestName, + ) + } + + return reflectFindStorage(msg, mi.storage) +} + +// SetStorage sets the storage name for a protobuf message +func (mi MethodInfo) SetStorage(msg proto.Message, storage string) error { + if mi.requestName != string(proto.MessageName(msg)) { + return fmt.Errorf( + "proto message %s does not match expected RPC request message %s", + proto.MessageName(msg), mi.requestName, + ) + } + + return reflectSetStorage(msg, mi.storage, storage) +} + +// UnmarshalRequestProto will unmarshal the bytes into the method's request +// message type +func (mi MethodInfo) UnmarshalRequestProto(b []byte) (proto.Message, error) { + return mi.requestFactory(b) +} + // Registry contains info about RPC methods type Registry struct { protos map[string]MethodInfo diff --git a/internal/praefect/protoregistry/protoregistry_test.go b/internal/praefect/protoregistry/protoregistry_test.go index 39d24e1f78b0fe8dc6ac5bb502e6230964abd1a1..0b370e763627709eb182f21d3776bac10af059eb 100644 --- a/internal/praefect/protoregistry/protoregistry_test.go +++ b/internal/praefect/protoregistry/protoregistry_test.go @@ -146,8 +146,13 @@ func TestNewProtoRegistry(t *testing.T) { require.NoError(t, err) require.Equalf(t, opType, methodInfo.Operation, "expect %s:%s to have the correct op type", serviceName, methodName) + require.Equal(t, method, methodInfo.FullMethodName()) - require.False(t, GitalyProtoPreregistered.IsInterceptedMethod(method), method) + actualServiceName, actualMethodName := methodInfo.ServiceNameAndMethodName() + require.Equal(t, fmt.Sprintf("gitaly.%s", serviceName), actualServiceName) + require.Equal(t, methodName, actualMethodName) + + require.False(t, protoregistry.GitalyProtoPreregistered.IsInterceptedMethod(method), method) } } }