From e65d5c4597be6d0faf5353a72d0951910ddd97e6 Mon Sep 17 00:00:00 2001 From: Quang-Minh Nguyen Date: Fri, 21 Apr 2023 17:11:58 +0700 Subject: [PATCH 1/2] Add a function to protoregistry to retrieve service name and method name protoregistry maintains a registry containing the list of RPC methods and their reflection. protoregistry.methodInfo returns the full name of a method. This commit adds one more function to get the service name (including package name) and method name separately. --- .../praefect/protoregistry/protoregistry.go | 142 ++++++++++++++++++ .../protoregistry/protoregistry_test.go | 7 +- 2 files changed, 148 insertions(+), 1 deletion(-) diff --git a/internal/praefect/protoregistry/protoregistry.go b/internal/praefect/protoregistry/protoregistry.go index 5cb15bbc5f..b12e4f46dd 100644 --- a/internal/praefect/protoregistry/protoregistry.go +++ b/internal/praefect/protoregistry/protoregistry.go @@ -22,6 +22,148 @@ func init() { } } +// OpType represents the operation type for a RPC method +type OpType int + +const ( + // OpUnknown = unknown operation type + OpUnknown OpType = iota + // OpAccessor = accessor operation type (ready only) + OpAccessor + // OpMutator = mutator operation type (modifies a repository) + OpMutator + // OpMaintenance is an operation which performs maintenance-tasks on the repository. It + // shouldn't ever result in a user-visible change in behaviour, except that it may repair + // corrupt data. + OpMaintenance +) + +// Scope represents the intended scope of an RPC method +type Scope int + +const ( + // ScopeUnknown is the default scope until determined otherwise + ScopeUnknown Scope = iota + // ScopeRepository indicates an RPC's scope is limited to a repository + ScopeRepository + // ScopeStorage indicates an RPC is scoped to an entire storage location + ScopeStorage +) + +func (s Scope) String() string { + switch s { + case ScopeStorage: + return "storage" + case ScopeRepository: + return "repository" + default: + return fmt.Sprintf("N/A: %d", s) + } +} + +var protoScope = map[gitalypb.OperationMsg_Scope]Scope{ + gitalypb.OperationMsg_REPOSITORY: ScopeRepository, + gitalypb.OperationMsg_STORAGE: ScopeStorage, +} + +// MethodInfo contains metadata about the RPC method. Refer to documentation +// for message type "OperationMsg" shared.proto in ./proto for +// more documentation. +type MethodInfo struct { + Operation OpType + Scope Scope + targetRepo []int + additionalRepo []int + requestName string // protobuf message name for input type + requestFactory protoFactory + storage []int + fullMethodName string +} + +// TargetRepo returns the target repository for a protobuf message if it exists +func (mi MethodInfo) TargetRepo(msg proto.Message) (*gitalypb.Repository, error) { + return mi.getRepo(msg, mi.targetRepo) +} + +// AdditionalRepo returns the additional repository for a protobuf message that needs a storage rewritten +// if it exists +func (mi MethodInfo) AdditionalRepo(msg proto.Message) (*gitalypb.Repository, bool, error) { + if mi.additionalRepo == nil { + return nil, false, nil + } + + repo, err := mi.getRepo(msg, mi.additionalRepo) + + return repo, true, err +} + +//nolint:revive // This is unintentionally missing documentation. +func (mi MethodInfo) FullMethodName() string { + return mi.fullMethodName +} + +// ServiceNameAndMethodName returns a tuple of service name and method name. The service name +// includes its package name. +func (mi MethodInfo) ServiceNameAndMethodName() (string, string) { + parts := strings.SplitN(strings.TrimPrefix(mi.fullMethodName, "/"), "/", 2) + if len(parts) < 2 { + return "", "" + } + return parts[0], parts[1] +} + +func (mi MethodInfo) getRepo(msg proto.Message, targetOid []int) (*gitalypb.Repository, error) { + if mi.requestName != string(proto.MessageName(msg)) { + return nil, fmt.Errorf( + "proto message %s does not match expected RPC request message %s", + proto.MessageName(msg), mi.requestName, + ) + } + + repo, err := reflectFindRepoTarget(msg, targetOid) + switch { + case err != nil: + return nil, err + case repo == nil: + // it is possible for the target repo to not be set (especially in our unit + // tests designed to fail and this should return an error to prevent nil + // pointer dereferencing + return nil, ErrTargetRepoMissing + default: + return repo, nil + } +} + +// Storage returns the storage name for a protobuf message if it exists +func (mi MethodInfo) Storage(msg proto.Message) (string, error) { + if mi.requestName != string(proto.MessageName(msg)) { + return "", fmt.Errorf( + "proto message %s does not match expected RPC request message %s", + proto.MessageName(msg), mi.requestName, + ) + } + + return reflectFindStorage(msg, mi.storage) +} + +// SetStorage sets the storage name for a protobuf message +func (mi MethodInfo) SetStorage(msg proto.Message, storage string) error { + if mi.requestName != string(proto.MessageName(msg)) { + return fmt.Errorf( + "proto message %s does not match expected RPC request message %s", + proto.MessageName(msg), mi.requestName, + ) + } + + return reflectSetStorage(msg, mi.storage, storage) +} + +// UnmarshalRequestProto will unmarshal the bytes into the method's request +// message type +func (mi MethodInfo) UnmarshalRequestProto(b []byte) (proto.Message, error) { + return mi.requestFactory(b) +} + // Registry contains info about RPC methods type Registry struct { protos map[string]MethodInfo diff --git a/internal/praefect/protoregistry/protoregistry_test.go b/internal/praefect/protoregistry/protoregistry_test.go index 39d24e1f78..0b370e7636 100644 --- a/internal/praefect/protoregistry/protoregistry_test.go +++ b/internal/praefect/protoregistry/protoregistry_test.go @@ -146,8 +146,13 @@ func TestNewProtoRegistry(t *testing.T) { require.NoError(t, err) require.Equalf(t, opType, methodInfo.Operation, "expect %s:%s to have the correct op type", serviceName, methodName) + require.Equal(t, method, methodInfo.FullMethodName()) - require.False(t, GitalyProtoPreregistered.IsInterceptedMethod(method), method) + actualServiceName, actualMethodName := methodInfo.ServiceNameAndMethodName() + require.Equal(t, fmt.Sprintf("gitaly.%s", serviceName), actualServiceName) + require.Equal(t, methodName, actualMethodName) + + require.False(t, protoregistry.GitalyProtoPreregistered.IsInterceptedMethod(method), method) } } } -- GitLab From f210407370a45486e9ddf11f3b139894cb9e9494 Mon Sep 17 00:00:00 2001 From: Quang-Minh Nguyen Date: Fri, 21 Apr 2023 17:15:03 +0700 Subject: [PATCH 2/2] Add `grpc-retry-pushback-ms` header when exceeding concurrency limit We have a concurrency limit for critical resource-hungry RPCs. When a request exceeds the limit, it is rejected with ResourceExhausted status code. This commit adds `grpc-retry-pushback-ms` header to rejection responses. This header is only effective if there is a retry policy configuration for the RPC. The retry policy must declare MaxAttempts and RetryableStatusCodes. This pushback duration has higher precedence than retry settings in the retry policy. When a client receives this header, it is forced to sleep. As this feature is supported in gRPC library implementations, clients can't refuse. It's recommended to add this header to protect most critical, resource-hungry RPCs, such as UploadPack, PackObject. Pushback duration is exponential. It is calculated from the previous retry attempts fo the same request. The retry attempt is extrated from `grpc-previous-rpc-attempts` request header. --- internal/gitaly/client/dial.go | 10 + .../middleware/limithandler/middleware.go | 11 +- internal/middleware/limithandler/pushback.go | 181 ++++++++++++++++++ 3 files changed, 201 insertions(+), 1 deletion(-) create mode 100644 internal/middleware/limithandler/pushback.go diff --git a/internal/gitaly/client/dial.go b/internal/gitaly/client/dial.go index def5b5fad7..bc8cf0bca3 100644 --- a/internal/gitaly/client/dial.go +++ b/internal/gitaly/client/dial.go @@ -205,6 +205,7 @@ func defaultServiceConfig() string { LoadBalancingConfig: []*gitalypb.LoadBalancingConfig{{ Policy: &gitalypb.LoadBalancingConfig_RoundRobin{}, }}, + MethodConfig: defaultMethodConfigs(), } configJSON, err := protojson.Marshal(serviceConfig) if err != nil { @@ -213,3 +214,12 @@ func defaultServiceConfig() string { return string(configJSON) } + +func defaultMethodConfigs() []*gitalypb.MethodConfig { + var configs []*gitalypb.MethodConfig + + // Add the list of method configs for RPCc pushback + configs = append(configs, limithandler.DefaultPushbackMethodConfigs()...) + + return configs +} diff --git a/internal/middleware/limithandler/middleware.go b/internal/middleware/limithandler/middleware.go index c532943a47..632bfb1e9f 100644 --- a/internal/middleware/limithandler/middleware.go +++ b/internal/middleware/limithandler/middleware.go @@ -42,6 +42,7 @@ type LimiterMiddleware struct { getLockKey GetLockKey requestsDroppedMetric *prometheus.CounterVec collect func(metrics chan<- prometheus.Metric) + pushback *pushback } // New creates a new middleware that limits requests. SetupFunc sets up the @@ -61,6 +62,7 @@ func New(cfg config.Cfg, getLockKey GetLockKey, setupMiddleware SetupFunc) *Limi "reason", }, ), + pushback: &pushback{policies: defaultPushbackPolicies()}, } setupMiddleware(cfg, middleware) @@ -95,9 +97,13 @@ func (c *LimiterMiddleware) UnaryInterceptor() grpc.UnaryServerInterceptor { return handler(ctx, req) } - return limiter.Limit(ctx, lockKey, func() (interface{}, error) { + response, err := limiter.Limit(ctx, lockKey, func() (interface{}, error) { return handler(ctx, req) }) + if err != nil { + c.pushback.push(ctx, info.FullMethod, err) + } + return response, err } } @@ -163,6 +169,9 @@ func (w *wrappedStream) RecvMsg(m interface{}) error { // It's our turn! return nil case err := <-errs: + if err != nil { + w.limiterMiddleware.pushback.push(ctx, w.info.FullMethod, err) + } return err } } diff --git a/internal/middleware/limithandler/pushback.go b/internal/middleware/limithandler/pushback.go new file mode 100644 index 0000000000..d9726a178f --- /dev/null +++ b/internal/middleware/limithandler/pushback.go @@ -0,0 +1,181 @@ +package limithandler + +import ( + "context" + "errors" + "fmt" + "math/rand" + "strconv" + "time" + + "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus" + "gitlab.com/gitlab-org/gitaly/v15/internal/backoff" + "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/protoregistry" + "gitlab.com/gitlab-org/gitaly/v15/internal/structerr" + "gitlab.com/gitlab-org/gitaly/v15/proto/go/gitalypb" + "google.golang.org/grpc" + "google.golang.org/grpc/metadata" + "google.golang.org/protobuf/types/known/durationpb" +) + +const ( + // GrpcPushbackHeader is the key for gRPC response header that defines the duration in + // milliseconds a client should back-off before re-send the request again. This header is + // only effective if there is a retry policy configuration for the RPC. The retry policy + // must declare MaxAttempts and RetryableStatusCodes. This pushback duration has higher + // precedence than retry settings in the retry policy. + // + // When a client receives this header, it is forced to sleep. As this feature is supported + // in gRPC library implementations, clients can't refuse. It's recommended to add this + // header to protect most critical, resource-hungry RPCs, such as UploadPack, PackObject. + // Please be nice! + // + // For more information, visit https://github.com/grpc/proposal/blob/master/A6-client-retries.md#pushback + GrpcPushbackHeader = "grpc-retry-pushback-ms" + // GrpcPreviousAttempts defines the key of gRPC request header that stores the amount of + // previous attempts of the same RPC. This number counts "transparent" failures and failures + // matching RetryableStatusCodes retry config. This counter stays in the library layer. If + // the application layer performs the retry, this value is always set to 0. + GrpcPreviousAttempts = "grpc-previous-rpc-attempts" +) + +// grpcPushbackMaxAttempt defines the maximum attempt a client should retry. Other type of +// transient failures, such as network failures, are also taken into account. +var grpcPushbackMaxAttempt = 3 + +// grpcPushbackRetryableStatusCodes defines the list of gRPC codes a client should perform retry +// automatically. The status codes are capitalized SNAKE_CASE. The following link contains the list +// of all codes: https://grpc.github.io/grpc/core/md_doc_statuscodes.html +var grpcPushbackRetryableStatusCodes = []string{"RESOURCE_EXHAUSTED"} + +// Exponential backoff parameters +var ( + initialBackoff = 5 * time.Second + maxBackoff = 60 * time.Second + backoffMultiplier = 2.0 +) + +// RPCsWithPushbackHeaders defines the list of Gitaly RPCs to add pushback support. Please note +// that the list must only include external RPCs from clients. Adding an internal RPC, such as +// /gitaly.HookService/PackObjectsHookWithSidechannel, makes Gitaly pushes itself. +var RPCsWithPushbackHeaders = []string{ + "/gitaly.SSHService/SSHUploadPackWithSidechannel", + "/gitaly.SmartHTTPService/PostUploadPackWithSidechannel", +} + +func init() { + for _, rpc := range RPCsWithPushbackHeaders { + if _, err := protoregistry.GitalyProtoPreregistered.LookupMethod(rpc); err != nil { + panic(fmt.Errorf("RPC not found: %s", rpc)) + } + } +} + +// DefaultPushbackMethodConfigs returns the list of gRPC method configs. Each method config includes +// name of RPC with pushback enabled, maximum attempts, and retryable status codes. This list should +// be added to client Service Config when it dials. +func DefaultPushbackMethodConfigs() []*gitalypb.MethodConfig { + var configs []*gitalypb.MethodConfig + + for _, rpc := range RPCsWithPushbackHeaders { + // Method is validated when this package is loaded + mth, _ := protoregistry.GitalyProtoPreregistered.LookupMethod(rpc) + + serviceName, methodName := mth.ServiceNameAndMethodName() + configs = append(configs, &gitalypb.MethodConfig{ + Name: []*gitalypb.MethodConfig_Name{{ + Service: serviceName, + Method: methodName, + }}, + // When specify pushback header grpc-retry-pushback-ms, client uses that + // value. Other exponential backoff parameters in RetryPolicy are ignored. + // We supply them here to make valid + RetryPolicy: &gitalypb.MethodConfig_RetryPolicy{ + MaxAttempts: uint32(grpcPushbackMaxAttempt), + RetryableStatusCodes: grpcPushbackRetryableStatusCodes, + InitialBackoff: durationpb.New(initialBackoff), + MaxBackoff: durationpb.New(maxBackoff), + BackoffMultiplier: float32(backoffMultiplier), + }, + }) + } + return configs +} + +// newLimitErrorBackoff returns an exponential backoff strategy when facing limit error. The backoff +// parameters are adjusted much longer and steeper than normal networking failure. When Gitaly node +// is saturated and starts to push-back traffic, it takes a lot of time for the situation to go +// away. The node can either wait for more room to breath or terminate in-flight requests. Either +// way, it does not make sense for clients to retry in short delays. +// +// | Retries | Delay before random jitter | +// | ------- | -------------------------- | +// | 0 | 5 second | +// | 1 | 10 seconds | +// | 2 | 20 seconds | +// | 3 | 40 seconds | +func newLimitErrorBackoff() backoff.Strategy { + exponential := backoff.NewDefaultExponential(rand.New(rand.NewSource(time.Now().UnixNano()))) + exponential.BaseDelay = initialBackoff + exponential.MaxDelay = maxBackoff + exponential.Multiplier = backoffMultiplier + return exponential +} + +func defaultPushbackPolicies() map[string]backoff.Strategy { + policies := map[string]backoff.Strategy{} + for _, rpc := range RPCsWithPushbackHeaders { + policies[rpc] = newLimitErrorBackoff() + } + return policies +} + +type pushback struct { + policies map[string]backoff.Strategy +} + +func (p *pushback) push(ctx context.Context, fullMethod string, err error) { + if !errors.Is(err, ErrMaxQueueSize) && !errors.Is(err, ErrMaxQueueTime) { + return + } + var strategy backoff.Strategy + strategy, exist := p.policies[fullMethod] + if !exist { + return + } + + var attempts uint + if strAttempts := metadata.ValueFromIncomingContext(ctx, GrpcPreviousAttempts); len(strAttempts) > 0 { + parsedAttempts, err := strconv.ParseInt(strAttempts[0], 10, 32) + if err != nil { + ctxlogrus.Extract(ctx).WithError(err).Error("fail to parse gRPC previous retry attempts") + return + } + attempts = uint(parsedAttempts) + } + + pushbackDuration := strategy.Backoff(attempts) + p.setResponseHeader(ctx, pushbackDuration) + p.setErrorDetail(err, pushbackDuration) +} + +func (p *pushback) setResponseHeader(ctx context.Context, pushbackDuration time.Duration) { + if err := grpc.SetTrailer(ctx, metadata.MD{GrpcPushbackHeader: []string{fmt.Sprintf("%d", pushbackDuration.Milliseconds())}}); err != nil { + ctxlogrus.Extract(ctx).WithError(err).Error("fail to set gRPC push-back header") + } +} + +func (p *pushback) setErrorDetail(err error, pushbackDuration time.Duration) { + var structErr structerr.Error + if errors.As(err, &structErr) { + for _, detail := range structErr.Details() { + if limitError, ok := detail.(*gitalypb.LimitError); ok { + // The underlying layers can specify its own RetryAfter value. The + // middleware should honor that decision. + if limitError.RetryAfter.AsDuration() == time.Duration(0) { + limitError.RetryAfter = durationpb.New(pushbackDuration) + } + } + } + } +} -- GitLab