From a25dd97b37953888e9ed73268960c2a52d61e116 Mon Sep 17 00:00:00 2001 From: Andrew Newdigate Date: Wed, 27 Sep 2017 00:48:19 +0100 Subject: [PATCH 01/19] Rate limiter initial implementation --- .../middleware/limithandler/limithandler.go | 115 +++++++++++++++ internal/server/server.go | 5 + vendor/golang.org/x/sync/LICENSE | 27 ++++ vendor/golang.org/x/sync/PATENTS | 22 +++ .../golang.org/x/sync/semaphore/semaphore.go | 131 ++++++++++++++++++ vendor/vendor.json | 6 + 6 files changed, 306 insertions(+) create mode 100644 internal/middleware/limithandler/limithandler.go create mode 100644 vendor/golang.org/x/sync/LICENSE create mode 100644 vendor/golang.org/x/sync/PATENTS create mode 100644 vendor/golang.org/x/sync/semaphore/semaphore.go diff --git a/internal/middleware/limithandler/limithandler.go b/internal/middleware/limithandler/limithandler.go new file mode 100644 index 0000000000..334c8fa725 --- /dev/null +++ b/internal/middleware/limithandler/limithandler.go @@ -0,0 +1,115 @@ +package limithandler + +import ( + "sync" + + "github.com/grpc-ecosystem/go-grpc-middleware/tags" + "golang.org/x/net/context" + "golang.org/x/sync/semaphore" + "google.golang.org/grpc" +) + +const perRepoConcurrencyLimit = 50 + +type limitedFunc func() (resp interface{}, err error) + +// Limiter contains rate limiter state +type Limiter struct { + v map[string]*semaphore.Weighted + mux sync.Mutex +} + +func (c *Limiter) getSemaphoreForRepoPath(repoPath string, max int64) *semaphore.Weighted { + c.mux.Lock() + defer c.mux.Unlock() + + s := c.v[repoPath] + if s != nil { + return s + } + + s = semaphore.NewWeighted(max) + c.v[repoPath] = s + return s +} + +func (c *Limiter) attemptCollect(repoPath string, w *semaphore.Weighted, max int64) { + if !w.TryAcquire(max) { + return + } + + c.mux.Lock() + defer c.mux.Unlock() + + delete(c.v, repoPath) +} + +func (c *Limiter) limit(ctx context.Context, repoPath string, f limitedFunc) (interface{}, error) { + w := c.getSemaphoreForRepoPath(repoPath, perRepoConcurrencyLimit) + err := w.Acquire(ctx, 1) + + if err != nil { + return nil, err + } + + defer w.Release(1) + + c.attemptCollect(repoPath, w, perRepoConcurrencyLimit) + + i, err := f() + + return i, err +} + +func getRepoPath(ctx context.Context) string { + tags := grpc_ctxtags.Extract(ctx) + ctxValue := tags.Values()["grpc.request.repoPath"] + if ctxValue == nil { + return "" + } + + s, ok := ctxValue.(string) + if ok { + return s + } + + return "" +} + +// UnaryInterceptor returns a Unary Interceptor +func (c *Limiter) UnaryInterceptor() grpc.UnaryServerInterceptor { + return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { + repoPath := getRepoPath(ctx) + if repoPath == "" { + return handler(ctx, req) + } + + return c.limit(ctx, repoPath, func() (interface{}, error) { + return handler(ctx, req) + }) + } +} + +// StreamInterceptor returns a Stream Interceptor +func (c *Limiter) StreamInterceptor() grpc.StreamServerInterceptor { + return func(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { + ctx := stream.Context() + + repoPath := getRepoPath(ctx) + if repoPath == "" { + return handler(srv, stream) + } + + _, err := c.limit(ctx, repoPath, func() (interface{}, error) { + err := handler(srv, stream) + return nil, err + }) + + return err + } +} + +// New creates a new rate limiter +func New() Limiter { + return Limiter{v: make(map[string]*semaphore.Weighted)} +} diff --git a/internal/server/server.go b/internal/server/server.go index 4690bb5c32..33d8da3fd3 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -5,6 +5,7 @@ import ( "gitlab.com/gitlab-org/gitaly/internal/helper/fieldextractors" "gitlab.com/gitlab-org/gitaly/internal/middleware/cancelhandler" + "gitlab.com/gitlab-org/gitaly/internal/middleware/limithandler" "gitlab.com/gitlab-org/gitaly/internal/middleware/panichandler" "gitlab.com/gitlab-org/gitaly/internal/middleware/sentryhandler" "gitlab.com/gitlab-org/gitaly/internal/rubyserver" @@ -28,6 +29,8 @@ func New(rubyServer *rubyserver.Server) *grpc.Server { grpc_ctxtags.WithFieldExtractor(fieldextractors.RepositoryFieldExtractor), } + lh := limithandler.New() + server := grpc.NewServer( grpc.StreamInterceptor(grpc_middleware.ChainStreamServer( grpc_ctxtags.StreamServerInterceptor(ctxTagOpts...), @@ -35,6 +38,7 @@ func New(rubyServer *rubyserver.Server) *grpc.Server { grpc_logrus.StreamServerInterceptor(logrusEntry), sentryhandler.StreamLogHandler, cancelhandler.Stream, // Should be below LogHandler + lh.StreamInterceptor(), authStreamServerInterceptor(), // Panic handler should remain last so that application panics will be // converted to errors and logged @@ -46,6 +50,7 @@ func New(rubyServer *rubyserver.Server) *grpc.Server { grpc_logrus.UnaryServerInterceptor(logrusEntry), sentryhandler.UnaryLogHandler, cancelhandler.Unary, // Should be below LogHandler + lh.UnaryInterceptor(), authUnaryServerInterceptor(), // Panic handler should remain last so that application panics will be // converted to errors and logged diff --git a/vendor/golang.org/x/sync/LICENSE b/vendor/golang.org/x/sync/LICENSE new file mode 100644 index 0000000000..6a66aea5ea --- /dev/null +++ b/vendor/golang.org/x/sync/LICENSE @@ -0,0 +1,27 @@ +Copyright (c) 2009 The Go Authors. All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are +met: + + * Redistributions of source code must retain the above copyright +notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above +copyright notice, this list of conditions and the following disclaimer +in the documentation and/or other materials provided with the +distribution. + * Neither the name of Google Inc. nor the names of its +contributors may be used to endorse or promote products derived from +this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/vendor/golang.org/x/sync/PATENTS b/vendor/golang.org/x/sync/PATENTS new file mode 100644 index 0000000000..733099041f --- /dev/null +++ b/vendor/golang.org/x/sync/PATENTS @@ -0,0 +1,22 @@ +Additional IP Rights Grant (Patents) + +"This implementation" means the copyrightable works distributed by +Google as part of the Go project. + +Google hereby grants to You a perpetual, worldwide, non-exclusive, +no-charge, royalty-free, irrevocable (except as stated in this section) +patent license to make, have made, use, offer to sell, sell, import, +transfer and otherwise run, modify and propagate the contents of this +implementation of Go, where such license applies only to those patent +claims, both currently owned or controlled by Google and acquired in +the future, licensable by Google that are necessarily infringed by this +implementation of Go. This grant does not include claims that would be +infringed only as a consequence of further modification of this +implementation. If you or your agent or exclusive licensee institute or +order or agree to the institution of patent litigation against any +entity (including a cross-claim or counterclaim in a lawsuit) alleging +that this implementation of Go or any code incorporated within this +implementation of Go constitutes direct or contributory patent +infringement, or inducement of patent infringement, then any patent +rights granted to you under this License for this implementation of Go +shall terminate as of the date such litigation is filed. diff --git a/vendor/golang.org/x/sync/semaphore/semaphore.go b/vendor/golang.org/x/sync/semaphore/semaphore.go new file mode 100644 index 0000000000..e9d2d79a97 --- /dev/null +++ b/vendor/golang.org/x/sync/semaphore/semaphore.go @@ -0,0 +1,131 @@ +// Copyright 2017 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Package semaphore provides a weighted semaphore implementation. +package semaphore // import "golang.org/x/sync/semaphore" + +import ( + "container/list" + "sync" + + // Use the old context because packages that depend on this one + // (e.g. cloud.google.com/go/...) must run on Go 1.6. + // TODO(jba): update to "context" when possible. + "golang.org/x/net/context" +) + +type waiter struct { + n int64 + ready chan<- struct{} // Closed when semaphore acquired. +} + +// NewWeighted creates a new weighted semaphore with the given +// maximum combined weight for concurrent access. +func NewWeighted(n int64) *Weighted { + w := &Weighted{size: n} + return w +} + +// Weighted provides a way to bound concurrent access to a resource. +// The callers can request access with a given weight. +type Weighted struct { + size int64 + cur int64 + mu sync.Mutex + waiters list.List +} + +// Acquire acquires the semaphore with a weight of n, blocking only until ctx +// is done. On success, returns nil. On failure, returns ctx.Err() and leaves +// the semaphore unchanged. +// +// If ctx is already done, Acquire may still succeed without blocking. +func (s *Weighted) Acquire(ctx context.Context, n int64) error { + s.mu.Lock() + if s.size-s.cur >= n && s.waiters.Len() == 0 { + s.cur += n + s.mu.Unlock() + return nil + } + + if n > s.size { + // Don't make other Acquire calls block on one that's doomed to fail. + s.mu.Unlock() + <-ctx.Done() + return ctx.Err() + } + + ready := make(chan struct{}) + w := waiter{n: n, ready: ready} + elem := s.waiters.PushBack(w) + s.mu.Unlock() + + select { + case <-ctx.Done(): + err := ctx.Err() + s.mu.Lock() + select { + case <-ready: + // Acquired the semaphore after we were canceled. Rather than trying to + // fix up the queue, just pretend we didn't notice the cancelation. + err = nil + default: + s.waiters.Remove(elem) + } + s.mu.Unlock() + return err + + case <-ready: + return nil + } +} + +// TryAcquire acquires the semaphore with a weight of n without blocking. +// On success, returns true. On failure, returns false and leaves the semaphore unchanged. +func (s *Weighted) TryAcquire(n int64) bool { + s.mu.Lock() + success := s.size-s.cur >= n && s.waiters.Len() == 0 + if success { + s.cur += n + } + s.mu.Unlock() + return success +} + +// Release releases the semaphore with a weight of n. +func (s *Weighted) Release(n int64) { + s.mu.Lock() + s.cur -= n + if s.cur < 0 { + s.mu.Unlock() + panic("semaphore: bad release") + } + for { + next := s.waiters.Front() + if next == nil { + break // No more waiters blocked. + } + + w := next.Value.(waiter) + if s.size-s.cur < w.n { + // Not enough tokens for the next waiter. We could keep going (to try to + // find a waiter with a smaller request), but under load that could cause + // starvation for large requests; instead, we leave all remaining waiters + // blocked. + // + // Consider a semaphore used as a read-write lock, with N tokens, N + // readers, and one writer. Each reader can Acquire(1) to obtain a read + // lock. The writer can Acquire(N) to obtain a write lock, excluding all + // of the readers. If we allow the readers to jump ahead in the queue, + // the writer will starve — there is always one token available for every + // reader. + break + } + + s.cur += w.n + s.waiters.Remove(next) + close(w.ready) + } + s.mu.Unlock() +} diff --git a/vendor/vendor.json b/vendor/vendor.json index 9a89c0cfe4..74090a4239 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -256,6 +256,12 @@ "revision": "59a0b19b5533c7977ddeb86b017bf507ed407b12", "revisionTime": "2017-06-03T08:13:02Z" }, + { + "checksumSHA1": "vcN67ZjTbGpLLwSghHCbAEvmzMo=", + "path": "golang.org/x/sync/semaphore", + "revision": "f52d1811a62927559de87708c8913c1650ce4f26", + "revisionTime": "2017-05-17T20:25:26Z" + }, { "checksumSHA1": "wxrHmKhFznZZAjrYK5/nWn+fZGc=", "path": "golang.org/x/sys/unix", -- GitLab From 993acb8449bacb288754050ced9c8697a6fffd35 Mon Sep 17 00:00:00 2001 From: Andrew Newdigate Date: Wed, 27 Sep 2017 00:53:34 +0100 Subject: [PATCH 02/19] Updated notice --- NOTICE | 53 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 53 insertions(+) diff --git a/NOTICE b/NOTICE index 02b6750e4b..6441e74d74 100644 --- a/NOTICE +++ b/NOTICE @@ -1909,6 +1909,59 @@ Additional IP Rights Grant (Patents) "This implementation" means the copyrightable works distributed by Google as part of the Go project. +Google hereby grants to You a perpetual, worldwide, non-exclusive, +no-charge, royalty-free, irrevocable (except as stated in this section) +patent license to make, have made, use, offer to sell, sell, import, +transfer and otherwise run, modify and propagate the contents of this +implementation of Go, where such license applies only to those patent +claims, both currently owned or controlled by Google and acquired in +the future, licensable by Google that are necessarily infringed by this +implementation of Go. This grant does not include claims that would be +infringed only as a consequence of further modification of this +implementation. If you or your agent or exclusive licensee institute or +order or agree to the institution of patent litigation against any +entity (including a cross-claim or counterclaim in a lawsuit) alleging +that this implementation of Go or any code incorporated within this +implementation of Go constitutes direct or contributory patent +infringement, or inducement of patent infringement, then any patent +rights granted to you under this License for this implementation of Go +shall terminate as of the date such litigation is filed. +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +LICENSE - gitlab.com/gitlab-org/gitaly/vendor/golang.org/x/sync +Copyright (c) 2009 The Go Authors. All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are +met: + + * Redistributions of source code must retain the above copyright +notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above +copyright notice, this list of conditions and the following disclaimer +in the documentation and/or other materials provided with the +distribution. + * Neither the name of Google Inc. nor the names of its +contributors may be used to endorse or promote products derived from +this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +PATENTS - gitlab.com/gitlab-org/gitaly/vendor/golang.org/x/sync +Additional IP Rights Grant (Patents) + +"This implementation" means the copyrightable works distributed by +Google as part of the Go project. + Google hereby grants to You a perpetual, worldwide, non-exclusive, no-charge, royalty-free, irrevocable (except as stated in this section) patent license to make, have made, use, offer to sell, sell, import, -- GitLab From 726e5edf8048a1302fae5664b9c13aedac83ed46 Mon Sep 17 00:00:00 2001 From: Andrew Newdigate Date: Wed, 27 Sep 2017 10:02:33 +0100 Subject: [PATCH 03/19] Separated locking and middleware concerns --- .../limithandler/concurrency_limiter.go | 81 +++++++++++++++++++ .../middleware/limithandler/limithandler.go | 77 +++++------------- 2 files changed, 99 insertions(+), 59 deletions(-) create mode 100644 internal/middleware/limithandler/concurrency_limiter.go diff --git a/internal/middleware/limithandler/concurrency_limiter.go b/internal/middleware/limithandler/concurrency_limiter.go new file mode 100644 index 0000000000..acc9feec63 --- /dev/null +++ b/internal/middleware/limithandler/concurrency_limiter.go @@ -0,0 +1,81 @@ +package limithandler + +import ( + "sync" + + "golang.org/x/net/context" + "golang.org/x/sync/semaphore" +) + +// LimitedFunc represents a function that will be limited +type LimitedFunc func() (resp interface{}, err error) + +type weightedWithSize struct { + w *semaphore.Weighted + n int64 +} + +// ConcurrencyLimiter contains rate limiter state +type ConcurrencyLimiter struct { + v map[string]*weightedWithSize + mux sync.Mutex +} + +// Lazy create a semaphore for the given key +func (c *ConcurrencyLimiter) getSemaphore(lockKey string, max int64) *semaphore.Weighted { + c.mux.Lock() + defer c.mux.Unlock() + + ws := c.v[lockKey] + if ws != nil { + return ws.w + } + + w := semaphore.NewWeighted(max) + c.v[lockKey] = &weightedWithSize{w: w, n: max} + return w +} + +func (c *ConcurrencyLimiter) attemptCollection(lockKey string) { + c.mux.Lock() + defer c.mux.Unlock() + + ws := c.v[lockKey] + if ws == nil { + return + } + + max := ws.n + w := ws.w + + if !w.TryAcquire(max) { + return + } + + // If we managed to acquire all the locks, we can remove the semaphore for this key + delete(c.v, lockKey) +} + +// Limit will limit the concurrency of f +func (c *ConcurrencyLimiter) Limit(ctx context.Context, lockKey string, maxConcurrency int64, f LimitedFunc) (interface{}, error) { + w := c.getSemaphore(lockKey, maxConcurrency) + err := w.Acquire(ctx, 1) + + if err != nil { + return nil, err + } + + defer w.Release(1) + + // Attempt to cleanup the semaphore it's no longer being used + c.attemptCollection(lockKey) + + resp, err := f() + + return resp, err +} + +// NewLimiter creates a new rate limiter +func NewLimiter() ConcurrencyLimiter { + return ConcurrencyLimiter{v: make(map[string]*weightedWithSize)} +} diff --git a/internal/middleware/limithandler/limithandler.go b/internal/middleware/limithandler/limithandler.go index 334c8fa725..00f8e460f4 100644 --- a/internal/middleware/limithandler/limithandler.go +++ b/internal/middleware/limithandler/limithandler.go @@ -1,64 +1,14 @@ package limithandler import ( - "sync" - "github.com/grpc-ecosystem/go-grpc-middleware/tags" "golang.org/x/net/context" - "golang.org/x/sync/semaphore" "google.golang.org/grpc" ) -const perRepoConcurrencyLimit = 50 - -type limitedFunc func() (resp interface{}, err error) - -// Limiter contains rate limiter state -type Limiter struct { - v map[string]*semaphore.Weighted - mux sync.Mutex -} - -func (c *Limiter) getSemaphoreForRepoPath(repoPath string, max int64) *semaphore.Weighted { - c.mux.Lock() - defer c.mux.Unlock() - - s := c.v[repoPath] - if s != nil { - return s - } - - s = semaphore.NewWeighted(max) - c.v[repoPath] = s - return s -} - -func (c *Limiter) attemptCollect(repoPath string, w *semaphore.Weighted, max int64) { - if !w.TryAcquire(max) { - return - } - - c.mux.Lock() - defer c.mux.Unlock() - - delete(c.v, repoPath) -} - -func (c *Limiter) limit(ctx context.Context, repoPath string, f limitedFunc) (interface{}, error) { - w := c.getSemaphoreForRepoPath(repoPath, perRepoConcurrencyLimit) - err := w.Acquire(ctx, 1) - - if err != nil { - return nil, err - } - - defer w.Release(1) - - c.attemptCollect(repoPath, w, perRepoConcurrencyLimit) - - i, err := f() - - return i, err +// LimiterMiddleware contains rate limiter state +type LimiterMiddleware struct { + limiter ConcurrencyLimiter } func getRepoPath(ctx context.Context) string { @@ -76,22 +26,29 @@ func getRepoPath(ctx context.Context) string { return "" } +func getMaxConcurrency(fullMethod string, repoPath string) int64 { + // TODO: lookup the max concurrency here + return 100 +} + // UnaryInterceptor returns a Unary Interceptor -func (c *Limiter) UnaryInterceptor() grpc.UnaryServerInterceptor { +func (c *LimiterMiddleware) UnaryInterceptor() grpc.UnaryServerInterceptor { return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { repoPath := getRepoPath(ctx) if repoPath == "" { return handler(ctx, req) } - return c.limit(ctx, repoPath, func() (interface{}, error) { + maxConcurrency := getMaxConcurrency(info.FullMethod, repoPath) + + return c.limiter.Limit(ctx, repoPath, maxConcurrency, func() (interface{}, error) { return handler(ctx, req) }) } } // StreamInterceptor returns a Stream Interceptor -func (c *Limiter) StreamInterceptor() grpc.StreamServerInterceptor { +func (c *LimiterMiddleware) StreamInterceptor() grpc.StreamServerInterceptor { return func(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { ctx := stream.Context() @@ -100,7 +57,9 @@ func (c *Limiter) StreamInterceptor() grpc.StreamServerInterceptor { return handler(srv, stream) } - _, err := c.limit(ctx, repoPath, func() (interface{}, error) { + maxConcurrency := getMaxConcurrency(info.FullMethod, repoPath) + + _, err := c.limiter.Limit(ctx, repoPath, maxConcurrency, func() (interface{}, error) { err := handler(srv, stream) return nil, err }) @@ -110,6 +69,6 @@ func (c *Limiter) StreamInterceptor() grpc.StreamServerInterceptor { } // New creates a new rate limiter -func New() Limiter { - return Limiter{v: make(map[string]*semaphore.Weighted)} +func New() LimiterMiddleware { + return LimiterMiddleware{limiter: NewLimiter()} } -- GitLab From 67d2a842d95bb25a0512f75368aa8c1982609cd6 Mon Sep 17 00:00:00 2001 From: Andrew Newdigate Date: Wed, 27 Sep 2017 11:18:20 +0100 Subject: [PATCH 04/19] Emit rate limit metrics to logging and prometheus --- internal/config/prometheus.go | 3 + .../limithandler/concurrency_limiter.go | 1 + .../middleware/limithandler/limithandler.go | 8 ++ internal/middleware/limithandler/metrics.go | 81 +++++++++++++++++++ 4 files changed, 93 insertions(+) create mode 100644 internal/middleware/limithandler/metrics.go diff --git a/internal/config/prometheus.go b/internal/config/prometheus.go index eb5dacc5bb..2d8936f9af 100644 --- a/internal/config/prometheus.go +++ b/internal/config/prometheus.go @@ -4,6 +4,7 @@ import ( "github.com/grpc-ecosystem/go-grpc-prometheus" "github.com/prometheus/client_golang/prometheus" log "github.com/sirupsen/logrus" + "gitlab.com/gitlab-org/gitaly/internal/middleware/limithandler" ) // ConfigurePrometheus uses the global configuration to configure prometheus @@ -17,4 +18,6 @@ func ConfigurePrometheus() { grpc_prometheus.EnableHandlingTimeHistogram(func(histogramOpts *prometheus.HistogramOpts) { histogramOpts.Buckets = Config.Prometheus.GRPCLatencyBuckets }) + + limithandler.EnableAcquireTimeHistogram(Config.Prometheus.GRPCLatencyBuckets) } diff --git a/internal/middleware/limithandler/concurrency_limiter.go b/internal/middleware/limithandler/concurrency_limiter.go index acc9feec63..26dc492f47 100644 --- a/internal/middleware/limithandler/concurrency_limiter.go +++ b/internal/middleware/limithandler/concurrency_limiter.go @@ -59,6 +59,7 @@ func (c *ConcurrencyLimiter) attemptCollection(lockKey string) { // Limit will limit the concurrency of f func (c *ConcurrencyLimiter) Limit(ctx context.Context, lockKey string, maxConcurrency int64, f LimitedFunc) (interface{}, error) { w := c.getSemaphore(lockKey, maxConcurrency) + err := w.Acquire(ctx, 1) if err != nil { diff --git a/internal/middleware/limithandler/limithandler.go b/internal/middleware/limithandler/limithandler.go index 00f8e460f4..26e09f7127 100644 --- a/internal/middleware/limithandler/limithandler.go +++ b/internal/middleware/limithandler/limithandler.go @@ -1,6 +1,8 @@ package limithandler import ( + "time" + "github.com/grpc-ecosystem/go-grpc-middleware/tags" "golang.org/x/net/context" "google.golang.org/grpc" @@ -40,8 +42,11 @@ func (c *LimiterMiddleware) UnaryInterceptor() grpc.UnaryServerInterceptor { } maxConcurrency := getMaxConcurrency(info.FullMethod, repoPath) + start := time.Now() return c.limiter.Limit(ctx, repoPath, maxConcurrency, func() (interface{}, error) { + emitRateLimitMetrics(ctx, "unary", info.FullMethod, start) + return handler(ctx, req) }) } @@ -58,8 +63,11 @@ func (c *LimiterMiddleware) StreamInterceptor() grpc.StreamServerInterceptor { } maxConcurrency := getMaxConcurrency(info.FullMethod, repoPath) + start := time.Now() _, err := c.limiter.Limit(ctx, repoPath, maxConcurrency, func() (interface{}, error) { + emitStreamRateLimitMetrics(ctx, info, start) + err := handler(srv, stream) return nil, err }) diff --git a/internal/middleware/limithandler/metrics.go b/internal/middleware/limithandler/metrics.go new file mode 100644 index 0000000000..b095787658 --- /dev/null +++ b/internal/middleware/limithandler/metrics.go @@ -0,0 +1,81 @@ +package limithandler + +import ( + "strings" + "time" + + prom "github.com/prometheus/client_golang/prometheus" + "google.golang.org/grpc" + + "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus" + "golang.org/x/net/context" +) + +const acquireDurationLogThreshold = 10 + +var ( + histogramEnabled = false + histogram *prom.HistogramVec +) + +func emitStreamRateLimitMetrics(ctx context.Context, info *grpc.StreamServerInfo, start time.Time) { + rpcType := getRPCType(info) + emitRateLimitMetrics(ctx, rpcType, info.FullMethod, start) +} + +func emitRateLimitMetrics(ctx context.Context, rpcType string, fullMethod string, start time.Time) { + acquireDuration := time.Since(start) + + if acquireDuration > acquireDurationLogThreshold { + logger := grpc_logrus.Extract(ctx) + logger.WithField("acquire_ms", acquireDuration.Seconds()*1000).Info("Rate limit acquire wait") + } + + if histogramEnabled { + serviceName, methodName := splitMethodName(fullMethod) + histogram.WithLabelValues(rpcType, serviceName, methodName).Observe(acquireDuration.Seconds()) + } +} + +func splitMethodName(fullMethodName string) (string, string) { + fullMethodName = strings.TrimPrefix(fullMethodName, "/") // remove leading slash + if i := strings.Index(fullMethodName, "/"); i >= 0 { + return fullMethodName[:i], fullMethodName[i+1:] + } + return "unknown", "unknown" +} + +func getRPCType(info *grpc.StreamServerInfo) string { + if !info.IsClientStream && !info.IsServerStream { + return "unary" + } + + if info.IsClientStream && !info.IsServerStream { + return "client_stream" + } + + if !info.IsClientStream && info.IsServerStream { + return "server_stream" + } + + return "bidi_stream" +} + +// EnableAcquireTimeHistogram enables histograms for acquisition times +func EnableAcquireTimeHistogram(buckets []float64) { + histogramEnabled = true + histogramOpts := prom.HistogramOpts{ + Namespace: "gitaly", + Subsystem: "rate_limiting", + Name: "acquiring_seconds", + Help: "Histogram of lock acquisition latency (seconds) for endpoint rate limiting", + Buckets: buckets, + } + + histogram = prom.NewHistogramVec( + histogramOpts, + []string{"grpc_type", "grpc_service", "grpc_method"}, + ) + + prom.Register(histogram) +} -- GitLab From b5b8297cba03065e8d469de6989703ea05595123 Mon Sep 17 00:00:00 2001 From: Andrew Newdigate Date: Wed, 27 Sep 2017 11:43:00 +0100 Subject: [PATCH 05/19] Add per-RPC configuration --- cmd/gitaly/main.go | 1 + config.toml.example | 5 ++++ internal/config/concurrency.go | 17 ++++++++++++ internal/config/config.go | 27 ++++++++++++------- .../middleware/limithandler/limithandler.go | 19 +++++++++++-- 5 files changed, 57 insertions(+), 12 deletions(-) create mode 100644 internal/config/concurrency.go diff --git a/cmd/gitaly/main.go b/cmd/gitaly/main.go index 48528718fa..a7f1d66d9c 100644 --- a/cmd/gitaly/main.go +++ b/cmd/gitaly/main.go @@ -110,6 +110,7 @@ func main() { config.ConfigureLogging() config.ConfigureSentry(version.GetVersion()) config.ConfigurePrometheus() + config.ConfigureConcurrencyLimits() var listeners []net.Listener diff --git a/config.toml.example b/config.toml.example index 5fe3942173..69c813c4ff 100644 --- a/config.toml.example +++ b/config.toml.example @@ -42,3 +42,8 @@ dir = "/home/git/gitaly/ruby" [gitlab-shell] # The directory where gitlab-shell is installed dir = "/home/git/gitlab-shell" + +# # You can adjust the concurrency of each RPC endpoint +# [[concurrency]] +# rpc = "/gitaly.RepositoryService/GarbageCollect" +# max_per_repo = 1 diff --git a/internal/config/concurrency.go b/internal/config/concurrency.go new file mode 100644 index 0000000000..a993bb801c --- /dev/null +++ b/internal/config/concurrency.go @@ -0,0 +1,17 @@ +package config + +import ( + "gitlab.com/gitlab-org/gitaly/internal/middleware/limithandler" +) + +// ConfigureConcurrencyLimits configures the sentry DSN +func ConfigureConcurrencyLimits() { + var maxConcurrencyPerRepoPerRPC map[string]int64 + + maxConcurrencyPerRepoPerRPC = make(map[string]int64) + for _, v := range Config.Concurrency { + maxConcurrencyPerRepoPerRPC[v.RPC] = v.MaxPerRepo + } + + limithandler.SetMaxRepoConcurrency(maxConcurrencyPerRepoPerRPC) +} diff --git a/internal/config/config.go b/internal/config/config.go index 7fea7bf8f4..17ecfd869b 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -19,16 +19,17 @@ var ( ) type config struct { - SocketPath string `toml:"socket_path" split_words:"true"` - ListenAddr string `toml:"listen_addr" split_words:"true"` - PrometheusListenAddr string `toml:"prometheus_listen_addr" split_words:"true"` - Git Git `toml:"git" envconfig:"git"` - Storages []Storage `toml:"storage" envconfig:"storage"` - Logging Logging `toml:"logging" envconfig:"logging"` - Prometheus Prometheus `toml:"prometheus"` - Auth Auth `toml:"auth"` - Ruby Ruby `toml:"gitaly-ruby"` - GitlabShell GitlabShell `toml:"gitlab-shell"` + SocketPath string `toml:"socket_path" split_words:"true"` + ListenAddr string `toml:"listen_addr" split_words:"true"` + PrometheusListenAddr string `toml:"prometheus_listen_addr" split_words:"true"` + Git Git `toml:"git" envconfig:"git"` + Storages []Storage `toml:"storage" envconfig:"storage"` + Logging Logging `toml:"logging" envconfig:"logging"` + Prometheus Prometheus `toml:"prometheus"` + Auth Auth `toml:"auth"` + Ruby Ruby `toml:"gitaly-ruby"` + GitlabShell GitlabShell `toml:"gitlab-shell"` + Concurrency []Concurrency `toml:"concurrency"` } // GitlabShell contains the settings required for executing `gitlab-shell` @@ -58,6 +59,12 @@ type Prometheus struct { GRPCLatencyBuckets []float64 `toml:"grpc_latency_buckets"` } +// Concurrency allows endpoints to be limited to a maximum concurrency per repo +type Concurrency struct { + RPC string `toml:"rpc"` + MaxPerRepo int64 `toml:"max_per_repo"` +} + // Load initializes the Config variable from file and the environment. // Environment variables take precedence over the file. func Load(file io.Reader) error { diff --git a/internal/middleware/limithandler/limithandler.go b/internal/middleware/limithandler/limithandler.go index 26e09f7127..5b8bb77f7b 100644 --- a/internal/middleware/limithandler/limithandler.go +++ b/internal/middleware/limithandler/limithandler.go @@ -13,6 +13,8 @@ type LimiterMiddleware struct { limiter ConcurrencyLimiter } +var maxConcurrencyPerRepoPerRPC map[string]int64 + func getRepoPath(ctx context.Context) string { tags := grpc_ctxtags.Extract(ctx) ctxValue := tags.Values()["grpc.request.repoPath"] @@ -29,8 +31,7 @@ func getRepoPath(ctx context.Context) string { } func getMaxConcurrency(fullMethod string, repoPath string) int64 { - // TODO: lookup the max concurrency here - return 100 + return maxConcurrencyPerRepoPerRPC[fullMethod] } // UnaryInterceptor returns a Unary Interceptor @@ -42,6 +43,10 @@ func (c *LimiterMiddleware) UnaryInterceptor() grpc.UnaryServerInterceptor { } maxConcurrency := getMaxConcurrency(info.FullMethod, repoPath) + if maxConcurrency <= 0 { + // No concurrency limiting + return handler(ctx, req) + } start := time.Now() return c.limiter.Limit(ctx, repoPath, maxConcurrency, func() (interface{}, error) { @@ -63,6 +68,11 @@ func (c *LimiterMiddleware) StreamInterceptor() grpc.StreamServerInterceptor { } maxConcurrency := getMaxConcurrency(info.FullMethod, repoPath) + if maxConcurrency <= 0 { + // No concurrency limiting + return handler(srv, stream) + } + start := time.Now() _, err := c.limiter.Limit(ctx, repoPath, maxConcurrency, func() (interface{}, error) { @@ -80,3 +90,8 @@ func (c *LimiterMiddleware) StreamInterceptor() grpc.StreamServerInterceptor { func New() LimiterMiddleware { return LimiterMiddleware{limiter: NewLimiter()} } + +// SetMaxRepoConcurrency Configures the max concurrency per repo per RPC +func SetMaxRepoConcurrency(v map[string]int64) { + maxConcurrencyPerRepoPerRPC = v +} -- GitLab From 45fa6e6f8941566ceef543af2eac9a1b5fc4060a Mon Sep 17 00:00:00 2001 From: Andrew Newdigate Date: Wed, 27 Sep 2017 12:46:56 +0100 Subject: [PATCH 06/19] Add concurrency limiter test --- internal/config/concurrency.go | 3 +- .../limithandler/concurrency_limiter.go | 10 +- .../limithandler/concurrency_limiter_test.go | 113 ++++++++++++++++++ internal/middleware/limithandler/metrics.go | 2 +- 4 files changed, 122 insertions(+), 6 deletions(-) create mode 100644 internal/middleware/limithandler/concurrency_limiter_test.go diff --git a/internal/config/concurrency.go b/internal/config/concurrency.go index a993bb801c..1c6f7de31b 100644 --- a/internal/config/concurrency.go +++ b/internal/config/concurrency.go @@ -6,9 +6,8 @@ import ( // ConfigureConcurrencyLimits configures the sentry DSN func ConfigureConcurrencyLimits() { - var maxConcurrencyPerRepoPerRPC map[string]int64 + maxConcurrencyPerRepoPerRPC := make(map[string]int64) - maxConcurrencyPerRepoPerRPC = make(map[string]int64) for _, v := range Config.Concurrency { maxConcurrencyPerRepoPerRPC[v.RPC] = v.MaxPerRepo } diff --git a/internal/middleware/limithandler/concurrency_limiter.go b/internal/middleware/limithandler/concurrency_limiter.go index 26dc492f47..939641ee0b 100644 --- a/internal/middleware/limithandler/concurrency_limiter.go +++ b/internal/middleware/limithandler/concurrency_limiter.go @@ -54,11 +54,18 @@ func (c *ConcurrencyLimiter) attemptCollection(lockKey string) { // If we managed to acquire all the locks, we can remove the semaphore for this key delete(c.v, lockKey) + } // Limit will limit the concurrency of f func (c *ConcurrencyLimiter) Limit(ctx context.Context, lockKey string, maxConcurrency int64, f LimitedFunc) (interface{}, error) { + if maxConcurrency <= 0 { + return f() + } + w := c.getSemaphore(lockKey, maxConcurrency) + // Attempt to cleanup the semaphore it's no longer being used + defer c.attemptCollection(lockKey) err := w.Acquire(ctx, 1) @@ -68,9 +75,6 @@ func (c *ConcurrencyLimiter) Limit(ctx context.Context, lockKey string, maxConcu defer w.Release(1) - // Attempt to cleanup the semaphore it's no longer being used - c.attemptCollection(lockKey) - resp, err := f() return resp, err diff --git a/internal/middleware/limithandler/concurrency_limiter_test.go b/internal/middleware/limithandler/concurrency_limiter_test.go new file mode 100644 index 0000000000..d49c401e7f --- /dev/null +++ b/internal/middleware/limithandler/concurrency_limiter_test.go @@ -0,0 +1,113 @@ +package limithandler + +import ( + "fmt" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "golang.org/x/net/context" +) + +type counter struct { + m *sync.Mutex + max int + current int +} + +func (c *counter) up() { + c.m.Lock() + defer c.m.Unlock() + + c.current = c.current + 1 + if c.current > c.max { + c.max = c.current + } +} + +func (c *counter) down() { + c.m.Lock() + defer c.m.Unlock() + + c.current = c.current - 1 +} + +func TestLimiter(t *testing.T) { + tests := []struct { + name string + concurrency int + maxConcurrency int64 + iterations int + buckets int + wantMax int + }{ + { + name: "single", + concurrency: 1, + maxConcurrency: 1, + iterations: 1, + buckets: 1, + wantMax: 1, + }, + { + name: "two-at-a-time", + concurrency: 10, + maxConcurrency: 2, + iterations: 1, + buckets: 1, + wantMax: 2, + }, + { + name: "two-by-two", + concurrency: 10, + maxConcurrency: 2, + iterations: 4, + buckets: 2, + wantMax: 4, + }, + { + name: "no-limit", + concurrency: 2, + maxConcurrency: 0, + iterations: 2, + buckets: 1, + wantMax: 2, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + limiter := NewLimiter() + wg := sync.WaitGroup{} + wg.Add(tt.concurrency) + + gauge := &counter{m: &sync.Mutex{}} + for c := 0; c < tt.concurrency; c++ { + go func() { + + for i := 0; i < tt.iterations; i++ { + lockKey := fmt.Sprintf("key:%v", i%tt.buckets) + + limiter.Limit(context.Background(), lockKey, tt.maxConcurrency, func() (interface{}, error) { + gauge.up() + + assert.True(t, int64(len(limiter.v)) <= tt.maxConcurrency) + assert.True(t, len(limiter.v) <= tt.buckets) + time.Sleep(1 * time.Millisecond) + gauge.down() + return nil, nil + }) + } + + wg.Done() + }() + } + + wg.Wait() + assert.Equal(t, tt.wantMax, gauge.max) + assert.Equal(t, 0, gauge.current) + assert.Equal(t, 0, len(limiter.v)) + }) + } +} diff --git a/internal/middleware/limithandler/metrics.go b/internal/middleware/limithandler/metrics.go index b095787658..12d8c3c127 100644 --- a/internal/middleware/limithandler/metrics.go +++ b/internal/middleware/limithandler/metrics.go @@ -11,7 +11,7 @@ import ( "golang.org/x/net/context" ) -const acquireDurationLogThreshold = 10 +const acquireDurationLogThreshold = 10 * time.Millisecond var ( histogramEnabled = false -- GitLab From 921c178db4ab1cbbf569755d843da741102a80e9 Mon Sep 17 00:00:00 2001 From: Andrew Newdigate Date: Wed, 27 Sep 2017 13:04:42 +0100 Subject: [PATCH 07/19] Fixed comment [skip ci] --- internal/config/concurrency.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/config/concurrency.go b/internal/config/concurrency.go index 1c6f7de31b..99f81a1c82 100644 --- a/internal/config/concurrency.go +++ b/internal/config/concurrency.go @@ -4,7 +4,7 @@ import ( "gitlab.com/gitlab-org/gitaly/internal/middleware/limithandler" ) -// ConfigureConcurrencyLimits configures the sentry DSN +// ConfigureConcurrencyLimits configures the per-repo, per RPC rate limits func ConfigureConcurrencyLimits() { maxConcurrencyPerRepoPerRPC := make(map[string]int64) -- GitLab From 2e2e85a4c9baf2410415f7aa91e45f643caffccb Mon Sep 17 00:00:00 2001 From: Andrew Newdigate Date: Wed, 27 Sep 2017 21:46:39 +0100 Subject: [PATCH 08/19] After a great call with Jacob, moved some things around to avoid an edge case that could possibly affect us --- internal/config/concurrency.go | 2 +- internal/config/config.go | 2 +- .../limithandler/concurrency_limiter.go | 22 ++++++---- .../limithandler/concurrency_limiter_test.go | 4 +- .../middleware/limithandler/limithandler.go | 41 ++++++++++++------- 5 files changed, 46 insertions(+), 25 deletions(-) diff --git a/internal/config/concurrency.go b/internal/config/concurrency.go index 99f81a1c82..7211bd3e1d 100644 --- a/internal/config/concurrency.go +++ b/internal/config/concurrency.go @@ -6,7 +6,7 @@ import ( // ConfigureConcurrencyLimits configures the per-repo, per RPC rate limits func ConfigureConcurrencyLimits() { - maxConcurrencyPerRepoPerRPC := make(map[string]int64) + maxConcurrencyPerRepoPerRPC := make(map[string]int) for _, v := range Config.Concurrency { maxConcurrencyPerRepoPerRPC[v.RPC] = v.MaxPerRepo diff --git a/internal/config/config.go b/internal/config/config.go index 17ecfd869b..25d57398e2 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -62,7 +62,7 @@ type Prometheus struct { // Concurrency allows endpoints to be limited to a maximum concurrency per repo type Concurrency struct { RPC string `toml:"rpc"` - MaxPerRepo int64 `toml:"max_per_repo"` + MaxPerRepo int `toml:"max_per_repo"` } // Load initializes the Config variable from file and the environment. diff --git a/internal/middleware/limithandler/concurrency_limiter.go b/internal/middleware/limithandler/concurrency_limiter.go index 939641ee0b..c543ef4a9e 100644 --- a/internal/middleware/limithandler/concurrency_limiter.go +++ b/internal/middleware/limithandler/concurrency_limiter.go @@ -11,18 +11,19 @@ import ( type LimitedFunc func() (resp interface{}, err error) type weightedWithSize struct { + // https://godoc.org/golang.org/x/sync/semaphore w *semaphore.Weighted n int64 } // ConcurrencyLimiter contains rate limiter state type ConcurrencyLimiter struct { - v map[string]*weightedWithSize - mux sync.Mutex + v map[interface{}]*weightedWithSize + mux *sync.Mutex } // Lazy create a semaphore for the given key -func (c *ConcurrencyLimiter) getSemaphore(lockKey string, max int64) *semaphore.Weighted { +func (c *ConcurrencyLimiter) getSemaphore(lockKey interface{}, max int64) *semaphore.Weighted { c.mux.Lock() defer c.mux.Unlock() @@ -36,7 +37,7 @@ func (c *ConcurrencyLimiter) getSemaphore(lockKey string, max int64) *semaphore. return w } -func (c *ConcurrencyLimiter) attemptCollection(lockKey string) { +func (c *ConcurrencyLimiter) attemptCollection(lockKey interface{}) { c.mux.Lock() defer c.mux.Unlock() @@ -52,18 +53,22 @@ func (c *ConcurrencyLimiter) attemptCollection(lockKey string) { return } + // By releasing, we prevent a lockup of goroutines that have already + // acquired the semaphore, but have yet to acquire on it + w.Release(max) + // If we managed to acquire all the locks, we can remove the semaphore for this key delete(c.v, lockKey) } // Limit will limit the concurrency of f -func (c *ConcurrencyLimiter) Limit(ctx context.Context, lockKey string, maxConcurrency int64, f LimitedFunc) (interface{}, error) { +func (c *ConcurrencyLimiter) Limit(ctx context.Context, lockKey interface{}, maxConcurrency int, f LimitedFunc) (interface{}, error) { if maxConcurrency <= 0 { return f() } - w := c.getSemaphore(lockKey, maxConcurrency) + w := c.getSemaphore(lockKey, int64(maxConcurrency)) // Attempt to cleanup the semaphore it's no longer being used defer c.attemptCollection(lockKey) @@ -82,5 +87,8 @@ func (c *ConcurrencyLimiter) Limit(ctx context.Context, lockKey string, maxConcu // NewLimiter creates a new rate limiter func NewLimiter() ConcurrencyLimiter { - return ConcurrencyLimiter{v: make(map[string]*weightedWithSize)} + return ConcurrencyLimiter{ + v: make(map[interface{}]*weightedWithSize), + mux: &sync.Mutex{}, + } } diff --git a/internal/middleware/limithandler/concurrency_limiter_test.go b/internal/middleware/limithandler/concurrency_limiter_test.go index d49c401e7f..876061d22a 100644 --- a/internal/middleware/limithandler/concurrency_limiter_test.go +++ b/internal/middleware/limithandler/concurrency_limiter_test.go @@ -38,7 +38,7 @@ func TestLimiter(t *testing.T) { tests := []struct { name string concurrency int - maxConcurrency int64 + maxConcurrency int iterations int buckets int wantMax int @@ -92,7 +92,7 @@ func TestLimiter(t *testing.T) { limiter.Limit(context.Background(), lockKey, tt.maxConcurrency, func() (interface{}, error) { gauge.up() - assert.True(t, int64(len(limiter.v)) <= tt.maxConcurrency) + assert.True(t, len(limiter.v) <= tt.maxConcurrency) assert.True(t, len(limiter.v) <= tt.buckets) time.Sleep(1 * time.Millisecond) gauge.down() diff --git a/internal/middleware/limithandler/limithandler.go b/internal/middleware/limithandler/limithandler.go index 5b8bb77f7b..5929a19a5e 100644 --- a/internal/middleware/limithandler/limithandler.go +++ b/internal/middleware/limithandler/limithandler.go @@ -10,10 +10,15 @@ import ( // LimiterMiddleware contains rate limiter state type LimiterMiddleware struct { + rpcLimiterConfigs map[string]*rpcLimiterConfig +} + +type rpcLimiterConfig struct { limiter ConcurrencyLimiter + max int } -var maxConcurrencyPerRepoPerRPC map[string]int64 +var maxConcurrencyPerRepoPerRPC map[string]int func getRepoPath(ctx context.Context) string { tags := grpc_ctxtags.Extract(ctx) @@ -30,10 +35,6 @@ func getRepoPath(ctx context.Context) string { return "" } -func getMaxConcurrency(fullMethod string, repoPath string) int64 { - return maxConcurrencyPerRepoPerRPC[fullMethod] -} - // UnaryInterceptor returns a Unary Interceptor func (c *LimiterMiddleware) UnaryInterceptor() grpc.UnaryServerInterceptor { return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { @@ -42,14 +43,15 @@ func (c *LimiterMiddleware) UnaryInterceptor() grpc.UnaryServerInterceptor { return handler(ctx, req) } - maxConcurrency := getMaxConcurrency(info.FullMethod, repoPath) - if maxConcurrency <= 0 { + config := c.rpcLimiterConfigs[info.FullMethod] + if config == nil { // No concurrency limiting return handler(ctx, req) } + start := time.Now() - return c.limiter.Limit(ctx, repoPath, maxConcurrency, func() (interface{}, error) { + return config.limiter.Limit(ctx, repoPath, config.max, func() (interface{}, error) { emitRateLimitMetrics(ctx, "unary", info.FullMethod, start) return handler(ctx, req) @@ -67,15 +69,15 @@ func (c *LimiterMiddleware) StreamInterceptor() grpc.StreamServerInterceptor { return handler(srv, stream) } - maxConcurrency := getMaxConcurrency(info.FullMethod, repoPath) - if maxConcurrency <= 0 { + config := c.rpcLimiterConfigs[info.FullMethod] + if config == nil { // No concurrency limiting return handler(srv, stream) } start := time.Now() - _, err := c.limiter.Limit(ctx, repoPath, maxConcurrency, func() (interface{}, error) { + _, err := config.limiter.Limit(ctx, repoPath, config.max, func() (interface{}, error) { emitStreamRateLimitMetrics(ctx, info, start) err := handler(srv, stream) @@ -88,10 +90,21 @@ func (c *LimiterMiddleware) StreamInterceptor() grpc.StreamServerInterceptor { // New creates a new rate limiter func New() LimiterMiddleware { - return LimiterMiddleware{limiter: NewLimiter()} + return LimiterMiddleware{ + rpcLimiterConfigs: createLimiterConfig(), + } +} + +func createLimiterConfig() map[string]*rpcLimiterConfig { + m := make(map[string]*rpcLimiterConfig) + for k, v := range maxConcurrencyPerRepoPerRPC { + m[k] = &rpcLimiterConfig{limiter: NewLimiter(), max: v} + } + + return m } // SetMaxRepoConcurrency Configures the max concurrency per repo per RPC -func SetMaxRepoConcurrency(v map[string]int64) { - maxConcurrencyPerRepoPerRPC = v +func SetMaxRepoConcurrency(config map[string]int) { + maxConcurrencyPerRepoPerRPC = config } -- GitLab From af124667ea53bfab74ca46b9bee158b125616831 Mon Sep 17 00:00:00 2001 From: Andrew Newdigate Date: Thu, 28 Sep 2017 12:44:34 +0100 Subject: [PATCH 09/19] Added extra comments --- internal/middleware/limithandler/concurrency_limiter.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/internal/middleware/limithandler/concurrency_limiter.go b/internal/middleware/limithandler/concurrency_limiter.go index c543ef4a9e..8bdbdca0d5 100644 --- a/internal/middleware/limithandler/concurrency_limiter.go +++ b/internal/middleware/limithandler/concurrency_limiter.go @@ -11,6 +11,9 @@ import ( type LimitedFunc func() (resp interface{}, err error) type weightedWithSize struct { + // A weighted semaphore is like a mutex, but with a number of 'slots'. + // When locking the locker requests 1 or more slots to be locked. + // In this package, the number of slots is the number of concurrent requests the rate limiter lets through. // https://godoc.org/golang.org/x/sync/semaphore w *semaphore.Weighted n int64 -- GitLab From b0b49143752e8b2356b1532d43064eb757fefdb1 Mon Sep 17 00:00:00 2001 From: Andrew Newdigate Date: Thu, 28 Sep 2017 13:56:05 +0100 Subject: [PATCH 10/19] Added prometheus gauges for current queue length and current concurrency for each endpoint --- .../limithandler/concurrency_limiter.go | 38 ++++++- .../limithandler/concurrency_limiter_test.go | 2 +- .../middleware/limithandler/limithandler.go | 12 +- internal/middleware/limithandler/metrics.go | 106 ++++++++++++------ 4 files changed, 104 insertions(+), 54 deletions(-) diff --git a/internal/middleware/limithandler/concurrency_limiter.go b/internal/middleware/limithandler/concurrency_limiter.go index 8bdbdca0d5..2fc31d1bba 100644 --- a/internal/middleware/limithandler/concurrency_limiter.go +++ b/internal/middleware/limithandler/concurrency_limiter.go @@ -2,6 +2,7 @@ package limithandler import ( "sync" + "time" "golang.org/x/net/context" "golang.org/x/sync/semaphore" @@ -19,10 +20,19 @@ type weightedWithSize struct { n int64 } +// ConcurrencyMonitor allows the concurrency monitor to be observed +type ConcurrencyMonitor interface { + Queued(ctx context.Context) + Dequeued(ctx context.Context) + Enter(ctx context.Context, acquireTime time.Duration) + Exit(ctx context.Context) +} + // ConcurrencyLimiter contains rate limiter state type ConcurrencyLimiter struct { - v map[interface{}]*weightedWithSize - mux *sync.Mutex + v map[interface{}]*weightedWithSize + mux *sync.Mutex + monitor ConcurrencyMonitor } // Lazy create a semaphore for the given key @@ -71,16 +81,33 @@ func (c *ConcurrencyLimiter) Limit(ctx context.Context, lockKey interface{}, max return f() } + var start time.Time + + if c.monitor != nil { + start = time.Now() + c.monitor.Queued(ctx) + } + w := c.getSemaphore(lockKey, int64(maxConcurrency)) + // Attempt to cleanup the semaphore it's no longer being used defer c.attemptCollection(lockKey) err := w.Acquire(ctx, 1) + if c.monitor != nil { + c.monitor.Dequeued(ctx) + } + if err != nil { return nil, err } + if c.monitor != nil { + c.monitor.Enter(ctx, time.Since(start)) + defer c.monitor.Exit(ctx) + } + defer w.Release(1) resp, err := f() @@ -89,9 +116,10 @@ func (c *ConcurrencyLimiter) Limit(ctx context.Context, lockKey interface{}, max } // NewLimiter creates a new rate limiter -func NewLimiter() ConcurrencyLimiter { +func NewLimiter(monitor ConcurrencyMonitor) ConcurrencyLimiter { return ConcurrencyLimiter{ - v: make(map[interface{}]*weightedWithSize), - mux: &sync.Mutex{}, + v: make(map[interface{}]*weightedWithSize), + mux: &sync.Mutex{}, + monitor: monitor, } } diff --git a/internal/middleware/limithandler/concurrency_limiter_test.go b/internal/middleware/limithandler/concurrency_limiter_test.go index 876061d22a..3a5977c22d 100644 --- a/internal/middleware/limithandler/concurrency_limiter_test.go +++ b/internal/middleware/limithandler/concurrency_limiter_test.go @@ -78,7 +78,7 @@ func TestLimiter(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - limiter := NewLimiter() + limiter := NewLimiter(nil) wg := sync.WaitGroup{} wg.Add(tt.concurrency) diff --git a/internal/middleware/limithandler/limithandler.go b/internal/middleware/limithandler/limithandler.go index 5929a19a5e..26a54c4db9 100644 --- a/internal/middleware/limithandler/limithandler.go +++ b/internal/middleware/limithandler/limithandler.go @@ -1,8 +1,6 @@ package limithandler import ( - "time" - "github.com/grpc-ecosystem/go-grpc-middleware/tags" "golang.org/x/net/context" "google.golang.org/grpc" @@ -49,11 +47,7 @@ func (c *LimiterMiddleware) UnaryInterceptor() grpc.UnaryServerInterceptor { return handler(ctx, req) } - start := time.Now() - return config.limiter.Limit(ctx, repoPath, config.max, func() (interface{}, error) { - emitRateLimitMetrics(ctx, "unary", info.FullMethod, start) - return handler(ctx, req) }) } @@ -75,11 +69,7 @@ func (c *LimiterMiddleware) StreamInterceptor() grpc.StreamServerInterceptor { return handler(srv, stream) } - start := time.Now() - _, err := config.limiter.Limit(ctx, repoPath, config.max, func() (interface{}, error) { - emitStreamRateLimitMetrics(ctx, info, start) - err := handler(srv, stream) return nil, err }) @@ -98,7 +88,7 @@ func New() LimiterMiddleware { func createLimiterConfig() map[string]*rpcLimiterConfig { m := make(map[string]*rpcLimiterConfig) for k, v := range maxConcurrencyPerRepoPerRPC { - m[k] = &rpcLimiterConfig{limiter: NewLimiter(), max: v} + m[k] = &rpcLimiterConfig{limiter: NewLimiter(newPromMonitor(k)), max: v} } return m diff --git a/internal/middleware/limithandler/metrics.go b/internal/middleware/limithandler/metrics.go index 12d8c3c127..f71a50c397 100644 --- a/internal/middleware/limithandler/metrics.go +++ b/internal/middleware/limithandler/metrics.go @@ -5,7 +5,6 @@ import ( "time" prom "github.com/prometheus/client_golang/prometheus" - "google.golang.org/grpc" "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus" "golang.org/x/net/context" @@ -14,27 +13,37 @@ import ( const acquireDurationLogThreshold = 10 * time.Millisecond var ( - histogramEnabled = false - histogram *prom.HistogramVec + histogramEnabled = false + histogramVec *prom.HistogramVec + inprogressGaugeVec = prom.NewGaugeVec( + prom.GaugeOpts{ + Namespace: "gitaly", + Subsystem: "rate_limiting", + Name: "in_progress", + Help: "Gauge of number of number of concurrent invocations currently in progress for this endpoint", + }, + []string{"grpc_service", "grpc_method"}, + ) + + queuedGaugeVec = prom.NewGaugeVec( + prom.GaugeOpts{ + Namespace: "gitaly", + Subsystem: "rate_limiting", + Name: "queued", + Help: "Gauge of number of number of invocations currently queued for this endpoint", + }, + []string{"grpc_service", "grpc_method"}, + ) ) -func emitStreamRateLimitMetrics(ctx context.Context, info *grpc.StreamServerInfo, start time.Time) { - rpcType := getRPCType(info) - emitRateLimitMetrics(ctx, rpcType, info.FullMethod, start) +type promMonitor struct { + queuedGauge prom.Gauge + inprogressGauge prom.Gauge + histogram prom.Histogram } -func emitRateLimitMetrics(ctx context.Context, rpcType string, fullMethod string, start time.Time) { - acquireDuration := time.Since(start) - - if acquireDuration > acquireDurationLogThreshold { - logger := grpc_logrus.Extract(ctx) - logger.WithField("acquire_ms", acquireDuration.Seconds()*1000).Info("Rate limit acquire wait") - } - - if histogramEnabled { - serviceName, methodName := splitMethodName(fullMethod) - histogram.WithLabelValues(rpcType, serviceName, methodName).Observe(acquireDuration.Seconds()) - } +func init() { + prom.MustRegister(inprogressGaugeVec, queuedGaugeVec) } func splitMethodName(fullMethodName string) (string, string) { @@ -45,22 +54,6 @@ func splitMethodName(fullMethodName string) (string, string) { return "unknown", "unknown" } -func getRPCType(info *grpc.StreamServerInfo) string { - if !info.IsClientStream && !info.IsServerStream { - return "unary" - } - - if info.IsClientStream && !info.IsServerStream { - return "client_stream" - } - - if !info.IsClientStream && info.IsServerStream { - return "server_stream" - } - - return "bidi_stream" -} - // EnableAcquireTimeHistogram enables histograms for acquisition times func EnableAcquireTimeHistogram(buckets []float64) { histogramEnabled = true @@ -72,10 +65,49 @@ func EnableAcquireTimeHistogram(buckets []float64) { Buckets: buckets, } - histogram = prom.NewHistogramVec( + histogramVec = prom.NewHistogramVec( histogramOpts, - []string{"grpc_type", "grpc_service", "grpc_method"}, + []string{"grpc_service", "grpc_method"}, ) - prom.Register(histogram) + prom.Register(histogramVec) +} + +func (c promMonitor) Queued(ctx context.Context) { + c.queuedGauge.Inc() +} + +func (c promMonitor) Dequeued(ctx context.Context) { + c.queuedGauge.Dec() +} + +func (c promMonitor) Enter(ctx context.Context, acquireTime time.Duration) { + c.inprogressGauge.Inc() + + if acquireTime > acquireDurationLogThreshold { + logger := grpc_logrus.Extract(ctx) + logger.WithField("acquire_ms", acquireTime.Seconds()*1000).Info("Rate limit acquire wait") + } + + if c.histogram != nil { + c.histogram.Observe(acquireTime.Seconds()) + } +} + +func (c promMonitor) Exit(ctx context.Context) { + c.inprogressGauge.Dec() +} + +func newPromMonitor(fullMethod string) ConcurrencyMonitor { + serviceName, methodName := splitMethodName(fullMethod) + + queuedGauge := queuedGaugeVec.WithLabelValues(serviceName, methodName) + inprogressGauge := inprogressGaugeVec.WithLabelValues(serviceName, methodName) + + var histogram prom.Histogram + if histogramVec != nil { + histogram = histogramVec.WithLabelValues(serviceName, methodName) + } + + return &promMonitor{queuedGauge, inprogressGauge, histogram} } -- GitLab From 11b7b932226c4cd9e985cbebb71187967684214d Mon Sep 17 00:00:00 2001 From: Andrew Newdigate Date: Thu, 28 Sep 2017 16:28:22 +0100 Subject: [PATCH 11/19] Cleanups based on Jacob's review --- .../limithandler/concurrency_limiter.go | 31 +++++++++--------- .../limithandler/concurrency_limiter_test.go | 32 +++++++++++-------- .../middleware/limithandler/limithandler.go | 3 +- internal/middleware/limithandler/metrics.go | 8 ++--- 4 files changed, 40 insertions(+), 34 deletions(-) diff --git a/internal/middleware/limithandler/concurrency_limiter.go b/internal/middleware/limithandler/concurrency_limiter.go index 2fc31d1bba..189dd68e60 100644 --- a/internal/middleware/limithandler/concurrency_limiter.go +++ b/internal/middleware/limithandler/concurrency_limiter.go @@ -72,7 +72,6 @@ func (c *ConcurrencyLimiter) attemptCollection(lockKey interface{}) { // If we managed to acquire all the locks, we can remove the semaphore for this key delete(c.v, lockKey) - } // Limit will limit the concurrency of f @@ -81,12 +80,8 @@ func (c *ConcurrencyLimiter) Limit(ctx context.Context, lockKey interface{}, max return f() } - var start time.Time - - if c.monitor != nil { - start = time.Now() - c.monitor.Queued(ctx) - } + start := time.Now() + c.monitor.Queued(ctx) w := c.getSemaphore(lockKey, int64(maxConcurrency)) @@ -94,19 +89,14 @@ func (c *ConcurrencyLimiter) Limit(ctx context.Context, lockKey interface{}, max defer c.attemptCollection(lockKey) err := w.Acquire(ctx, 1) - - if c.monitor != nil { - c.monitor.Dequeued(ctx) - } + c.monitor.Dequeued(ctx) if err != nil { return nil, err } - if c.monitor != nil { - c.monitor.Enter(ctx, time.Since(start)) - defer c.monitor.Exit(ctx) - } + c.monitor.Enter(ctx, time.Since(start)) + defer c.monitor.Exit(ctx) defer w.Release(1) @@ -117,9 +107,20 @@ func (c *ConcurrencyLimiter) Limit(ctx context.Context, lockKey interface{}, max // NewLimiter creates a new rate limiter func NewLimiter(monitor ConcurrencyMonitor) ConcurrencyLimiter { + if monitor == nil { + monitor = &nullConcurrencyMonitor{} + } + return ConcurrencyLimiter{ v: make(map[interface{}]*weightedWithSize), mux: &sync.Mutex{}, monitor: monitor, } } + +type nullConcurrencyMonitor struct{} + +func (c *nullConcurrencyMonitor) Queued(ctx context.Context) {} +func (c *nullConcurrencyMonitor) Dequeued(ctx context.Context) {} +func (c *nullConcurrencyMonitor) Enter(ctx context.Context, acquireTime time.Duration) {} +func (c *nullConcurrencyMonitor) Exit(ctx context.Context) {} diff --git a/internal/middleware/limithandler/concurrency_limiter_test.go b/internal/middleware/limithandler/concurrency_limiter_test.go index 3a5977c22d..8ef266bf8c 100644 --- a/internal/middleware/limithandler/concurrency_limiter_test.go +++ b/internal/middleware/limithandler/concurrency_limiter_test.go @@ -12,14 +12,14 @@ import ( ) type counter struct { - m *sync.Mutex + sync.Mutex max int current int } func (c *counter) up() { - c.m.Lock() - defer c.m.Unlock() + c.Lock() + defer c.Unlock() c.current = c.current + 1 if c.current > c.max { @@ -28,8 +28,8 @@ func (c *counter) up() { } func (c *counter) down() { - c.m.Lock() - defer c.m.Unlock() + c.Lock() + defer c.Unlock() c.current = c.current - 1 } @@ -40,6 +40,7 @@ func TestLimiter(t *testing.T) { concurrency int maxConcurrency int iterations int + delay time.Duration buckets int wantMax int }{ @@ -48,32 +49,36 @@ func TestLimiter(t *testing.T) { concurrency: 1, maxConcurrency: 1, iterations: 1, + delay: 1 * time.Millisecond, buckets: 1, wantMax: 1, }, { name: "two-at-a-time", - concurrency: 10, + concurrency: 100, maxConcurrency: 2, - iterations: 1, + iterations: 10, + delay: 1 * time.Millisecond, buckets: 1, wantMax: 2, }, { name: "two-by-two", - concurrency: 10, + concurrency: 100, maxConcurrency: 2, + delay: 1 * time.Nanosecond, iterations: 4, buckets: 2, wantMax: 4, }, { name: "no-limit", - concurrency: 2, + concurrency: 10, maxConcurrency: 0, - iterations: 2, + iterations: 200, + delay: 1 * time.Nanosecond, buckets: 1, - wantMax: 2, + wantMax: 10, }, } for _, tt := range tests { @@ -82,7 +87,7 @@ func TestLimiter(t *testing.T) { wg := sync.WaitGroup{} wg.Add(tt.concurrency) - gauge := &counter{m: &sync.Mutex{}} + gauge := &counter{} for c := 0; c < tt.concurrency; c++ { go func() { @@ -94,7 +99,8 @@ func TestLimiter(t *testing.T) { assert.True(t, len(limiter.v) <= tt.maxConcurrency) assert.True(t, len(limiter.v) <= tt.buckets) - time.Sleep(1 * time.Millisecond) + time.Sleep(tt.delay) + gauge.down() return nil, nil }) diff --git a/internal/middleware/limithandler/limithandler.go b/internal/middleware/limithandler/limithandler.go index 26a54c4db9..1c7dbd60ea 100644 --- a/internal/middleware/limithandler/limithandler.go +++ b/internal/middleware/limithandler/limithandler.go @@ -70,8 +70,7 @@ func (c *LimiterMiddleware) StreamInterceptor() grpc.StreamServerInterceptor { } _, err := config.limiter.Limit(ctx, repoPath, config.max, func() (interface{}, error) { - err := handler(srv, stream) - return nil, err + return nil, handler(srv, stream) }) return err diff --git a/internal/middleware/limithandler/metrics.go b/internal/middleware/limithandler/metrics.go index f71a50c397..bbb9ad533a 100644 --- a/internal/middleware/limithandler/metrics.go +++ b/internal/middleware/limithandler/metrics.go @@ -73,15 +73,15 @@ func EnableAcquireTimeHistogram(buckets []float64) { prom.Register(histogramVec) } -func (c promMonitor) Queued(ctx context.Context) { +func (c *promMonitor) Queued(ctx context.Context) { c.queuedGauge.Inc() } -func (c promMonitor) Dequeued(ctx context.Context) { +func (c *promMonitor) Dequeued(ctx context.Context) { c.queuedGauge.Dec() } -func (c promMonitor) Enter(ctx context.Context, acquireTime time.Duration) { +func (c *promMonitor) Enter(ctx context.Context, acquireTime time.Duration) { c.inprogressGauge.Inc() if acquireTime > acquireDurationLogThreshold { @@ -94,7 +94,7 @@ func (c promMonitor) Enter(ctx context.Context, acquireTime time.Duration) { } } -func (c promMonitor) Exit(ctx context.Context) { +func (c *promMonitor) Exit(ctx context.Context) { c.inprogressGauge.Dec() } -- GitLab From d57eb44a8432cfa3953efc2354cb746f385eebd6 Mon Sep 17 00:00:00 2001 From: Andrew Newdigate Date: Thu, 28 Sep 2017 17:04:35 +0100 Subject: [PATCH 12/19] Use a shared max, making the code much simpler --- .../limithandler/concurrency_limiter.go | 58 ++++---- .../limithandler/concurrency_limiter_test.go | 135 ++++++++++++------ .../middleware/limithandler/limithandler.go | 32 ++--- 3 files changed, 131 insertions(+), 94 deletions(-) diff --git a/internal/middleware/limithandler/concurrency_limiter.go b/internal/middleware/limithandler/concurrency_limiter.go index 189dd68e60..0cc86bbd73 100644 --- a/internal/middleware/limithandler/concurrency_limiter.go +++ b/internal/middleware/limithandler/concurrency_limiter.go @@ -11,15 +11,6 @@ import ( // LimitedFunc represents a function that will be limited type LimitedFunc func() (resp interface{}, err error) -type weightedWithSize struct { - // A weighted semaphore is like a mutex, but with a number of 'slots'. - // When locking the locker requests 1 or more slots to be locked. - // In this package, the number of slots is the number of concurrent requests the rate limiter lets through. - // https://godoc.org/golang.org/x/sync/semaphore - w *semaphore.Weighted - n int64 -} - // ConcurrencyMonitor allows the concurrency monitor to be observed type ConcurrencyMonitor interface { Queued(ctx context.Context) @@ -30,23 +21,28 @@ type ConcurrencyMonitor interface { // ConcurrencyLimiter contains rate limiter state type ConcurrencyLimiter struct { - v map[interface{}]*weightedWithSize - mux *sync.Mutex - monitor ConcurrencyMonitor + // A weighted semaphore is like a mutex, but with a number of 'slots'. + // When locking the locker requests 1 or more slots to be locked. + // In this package, the number of slots is the number of concurrent requests the rate limiter lets through. + // https://godoc.org/golang.org/x/sync/semaphore + semaphores map[interface{}]*semaphore.Weighted + max int64 + mux *sync.Mutex + monitor ConcurrencyMonitor } // Lazy create a semaphore for the given key -func (c *ConcurrencyLimiter) getSemaphore(lockKey interface{}, max int64) *semaphore.Weighted { +func (c *ConcurrencyLimiter) getSemaphore(lockKey interface{}) *semaphore.Weighted { c.mux.Lock() defer c.mux.Unlock() - ws := c.v[lockKey] + ws := c.semaphores[lockKey] if ws != nil { - return ws.w + return ws } - w := semaphore.NewWeighted(max) - c.v[lockKey] = &weightedWithSize{w: w, n: max} + w := semaphore.NewWeighted(c.max) + c.semaphores[lockKey] = w return w } @@ -54,36 +50,33 @@ func (c *ConcurrencyLimiter) attemptCollection(lockKey interface{}) { c.mux.Lock() defer c.mux.Unlock() - ws := c.v[lockKey] + ws := c.semaphores[lockKey] if ws == nil { return } - max := ws.n - w := ws.w - - if !w.TryAcquire(max) { + if !ws.TryAcquire(c.max) { return } // By releasing, we prevent a lockup of goroutines that have already // acquired the semaphore, but have yet to acquire on it - w.Release(max) + ws.Release(c.max) // If we managed to acquire all the locks, we can remove the semaphore for this key - delete(c.v, lockKey) + delete(c.semaphores, lockKey) } // Limit will limit the concurrency of f -func (c *ConcurrencyLimiter) Limit(ctx context.Context, lockKey interface{}, maxConcurrency int, f LimitedFunc) (interface{}, error) { - if maxConcurrency <= 0 { +func (c *ConcurrencyLimiter) Limit(ctx context.Context, lockKey interface{}, f LimitedFunc) (interface{}, error) { + if c.max <= 0 { return f() } start := time.Now() c.monitor.Queued(ctx) - w := c.getSemaphore(lockKey, int64(maxConcurrency)) + w := c.getSemaphore(lockKey) // Attempt to cleanup the semaphore it's no longer being used defer c.attemptCollection(lockKey) @@ -106,15 +99,16 @@ func (c *ConcurrencyLimiter) Limit(ctx context.Context, lockKey interface{}, max } // NewLimiter creates a new rate limiter -func NewLimiter(monitor ConcurrencyMonitor) ConcurrencyLimiter { +func NewLimiter(max int, monitor ConcurrencyMonitor) *ConcurrencyLimiter { if monitor == nil { monitor = &nullConcurrencyMonitor{} } - return ConcurrencyLimiter{ - v: make(map[interface{}]*weightedWithSize), - mux: &sync.Mutex{}, - monitor: monitor, + return &ConcurrencyLimiter{ + semaphores: make(map[interface{}]*semaphore.Weighted), + max: int64(max), + mux: &sync.Mutex{}, + monitor: monitor, } } diff --git a/internal/middleware/limithandler/concurrency_limiter_test.go b/internal/middleware/limithandler/concurrency_limiter_test.go index 8ef266bf8c..406b794c6d 100644 --- a/internal/middleware/limithandler/concurrency_limiter_test.go +++ b/internal/middleware/limithandler/concurrency_limiter_test.go @@ -13,8 +13,12 @@ import ( type counter struct { sync.Mutex - max int - current int + max int + current int + queued int + dequeued int + enter int + exit int } func (c *counter) up() { @@ -34,76 +38,114 @@ func (c *counter) down() { c.current = c.current - 1 } +func (c *counter) Queued(ctx context.Context) { + c.Lock() + defer c.Unlock() + c.queued++ +} + +func (c *counter) Dequeued(ctx context.Context) { + c.Lock() + defer c.Unlock() + c.dequeued++ +} + +func (c *counter) Enter(ctx context.Context, acquireTime time.Duration) { + c.Lock() + defer c.Unlock() + c.enter++ +} + +func (c *counter) Exit(ctx context.Context) { + c.Lock() + defer c.Unlock() + c.exit++ +} + func TestLimiter(t *testing.T) { tests := []struct { - name string - concurrency int - maxConcurrency int - iterations int - delay time.Duration - buckets int - wantMax int + name string + concurrency int + maxConcurrency int + iterations int + delay time.Duration + buckets int + wantMax int + wantMonitorCalls int }{ { - name: "single", - concurrency: 1, - maxConcurrency: 1, - iterations: 1, - delay: 1 * time.Millisecond, - buckets: 1, - wantMax: 1, + name: "single", + concurrency: 1, + maxConcurrency: 1, + iterations: 1, + delay: 1 * time.Millisecond, + buckets: 1, + wantMax: 1, + wantMonitorCalls: 1, }, { - name: "two-at-a-time", - concurrency: 100, - maxConcurrency: 2, - iterations: 10, - delay: 1 * time.Millisecond, - buckets: 1, - wantMax: 2, + name: "two-at-a-time", + concurrency: 100, + maxConcurrency: 2, + iterations: 10, + delay: 1 * time.Millisecond, + buckets: 1, + wantMax: 2, + wantMonitorCalls: 100 * 10, }, { - name: "two-by-two", - concurrency: 100, - maxConcurrency: 2, - delay: 1 * time.Nanosecond, - iterations: 4, - buckets: 2, - wantMax: 4, + name: "two-by-two", + concurrency: 100, + maxConcurrency: 2, + delay: 1000 * time.Nanosecond, + iterations: 4, + buckets: 2, + wantMax: 4, + wantMonitorCalls: 100 * 4, }, { - name: "no-limit", - concurrency: 10, - maxConcurrency: 0, - iterations: 200, - delay: 1 * time.Nanosecond, - buckets: 1, - wantMax: 10, + name: "no-limit", + concurrency: 10, + maxConcurrency: 0, + iterations: 200, + delay: 1000 * time.Nanosecond, + buckets: 1, + wantMax: 10, + wantMonitorCalls: 0, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - limiter := NewLimiter(nil) + gauge := &counter{} + + limiter := NewLimiter(tt.maxConcurrency, gauge) wg := sync.WaitGroup{} wg.Add(tt.concurrency) - gauge := &counter{} + // We know of an edge case that can lead to the rate limiter + // occassionally letting one or two extra goroutines run + // concurrently. + wantMaxUpperBound := tt.wantMax + 2 + for c := 0; c < tt.concurrency; c++ { go func() { for i := 0; i < tt.iterations; i++ { lockKey := fmt.Sprintf("key:%v", i%tt.buckets) - limiter.Limit(context.Background(), lockKey, tt.maxConcurrency, func() (interface{}, error) { + limiter.Limit(context.Background(), lockKey, func() (interface{}, error) { gauge.up() - assert.True(t, len(limiter.v) <= tt.maxConcurrency) - assert.True(t, len(limiter.v) <= tt.buckets) + assert.True(t, gauge.current <= wantMaxUpperBound) + assert.True(t, len(limiter.semaphores) <= tt.buckets) time.Sleep(tt.delay) gauge.down() return nil, nil }) + + // Add + time.Sleep(tt.delay) } wg.Done() @@ -111,9 +153,14 @@ func TestLimiter(t *testing.T) { } wg.Wait() - assert.Equal(t, tt.wantMax, gauge.max) + assert.True(t, tt.wantMax <= gauge.max && gauge.max <= wantMaxUpperBound) assert.Equal(t, 0, gauge.current) - assert.Equal(t, 0, len(limiter.v)) + assert.Equal(t, 0, len(limiter.semaphores)) + + assert.Equal(t, tt.wantMonitorCalls, gauge.enter) + assert.Equal(t, tt.wantMonitorCalls, gauge.exit) + assert.Equal(t, tt.wantMonitorCalls, gauge.queued) + assert.Equal(t, tt.wantMonitorCalls, gauge.dequeued) }) } } diff --git a/internal/middleware/limithandler/limithandler.go b/internal/middleware/limithandler/limithandler.go index 1c7dbd60ea..bd924659b7 100644 --- a/internal/middleware/limithandler/limithandler.go +++ b/internal/middleware/limithandler/limithandler.go @@ -8,12 +8,7 @@ import ( // LimiterMiddleware contains rate limiter state type LimiterMiddleware struct { - rpcLimiterConfigs map[string]*rpcLimiterConfig -} - -type rpcLimiterConfig struct { - limiter ConcurrencyLimiter - max int + methodLimiters map[string]*ConcurrencyLimiter } var maxConcurrencyPerRepoPerRPC map[string]int @@ -41,13 +36,13 @@ func (c *LimiterMiddleware) UnaryInterceptor() grpc.UnaryServerInterceptor { return handler(ctx, req) } - config := c.rpcLimiterConfigs[info.FullMethod] - if config == nil { + limiter := c.methodLimiters[info.FullMethod] + if limiter == nil { // No concurrency limiting return handler(ctx, req) } - return config.limiter.Limit(ctx, repoPath, config.max, func() (interface{}, error) { + return limiter.Limit(ctx, repoPath, func() (interface{}, error) { return handler(ctx, req) }) } @@ -63,13 +58,13 @@ func (c *LimiterMiddleware) StreamInterceptor() grpc.StreamServerInterceptor { return handler(srv, stream) } - config := c.rpcLimiterConfigs[info.FullMethod] - if config == nil { + limiter := c.methodLimiters[info.FullMethod] + if limiter == nil { // No concurrency limiting return handler(srv, stream) } - _, err := config.limiter.Limit(ctx, repoPath, config.max, func() (interface{}, error) { + _, err := limiter.Limit(ctx, repoPath, func() (interface{}, error) { return nil, handler(srv, stream) }) @@ -80,17 +75,18 @@ func (c *LimiterMiddleware) StreamInterceptor() grpc.StreamServerInterceptor { // New creates a new rate limiter func New() LimiterMiddleware { return LimiterMiddleware{ - rpcLimiterConfigs: createLimiterConfig(), + methodLimiters: createLimiterConfig(), } } -func createLimiterConfig() map[string]*rpcLimiterConfig { - m := make(map[string]*rpcLimiterConfig) - for k, v := range maxConcurrencyPerRepoPerRPC { - m[k] = &rpcLimiterConfig{limiter: NewLimiter(newPromMonitor(k)), max: v} +func createLimiterConfig() map[string]*ConcurrencyLimiter { + result := make(map[string]*ConcurrencyLimiter) + + for fullMethodName, max := range maxConcurrencyPerRepoPerRPC { + result[fullMethodName] = NewLimiter(max, newPromMonitor(fullMethodName)) } - return m + return result } // SetMaxRepoConcurrency Configures the max concurrency per repo per RPC -- GitLab From 51b873f49a3150c908e59fc94423fb6688c844d9 Mon Sep 17 00:00:00 2001 From: Andrew Newdigate Date: Thu, 28 Sep 2017 17:39:48 +0100 Subject: [PATCH 13/19] Added an extra test for a wide number of buckets --- .../limithandler/concurrency_limiter_test.go | 29 ++++++++++++------- 1 file changed, 18 insertions(+), 11 deletions(-) diff --git a/internal/middleware/limithandler/concurrency_limiter_test.go b/internal/middleware/limithandler/concurrency_limiter_test.go index 406b794c6d..e35ced2470 100644 --- a/internal/middleware/limithandler/concurrency_limiter_test.go +++ b/internal/middleware/limithandler/concurrency_limiter_test.go @@ -70,7 +70,7 @@ func TestLimiter(t *testing.T) { iterations int delay time.Duration buckets int - wantMax int + wantMaxRange []int wantMonitorCalls int }{ { @@ -80,7 +80,7 @@ func TestLimiter(t *testing.T) { iterations: 1, delay: 1 * time.Millisecond, buckets: 1, - wantMax: 1, + wantMaxRange: []int{1, 1}, wantMonitorCalls: 1, }, { @@ -90,7 +90,7 @@ func TestLimiter(t *testing.T) { iterations: 10, delay: 1 * time.Millisecond, buckets: 1, - wantMax: 2, + wantMaxRange: []int{2, 4}, wantMonitorCalls: 100 * 10, }, { @@ -100,7 +100,7 @@ func TestLimiter(t *testing.T) { delay: 1000 * time.Nanosecond, iterations: 4, buckets: 2, - wantMax: 4, + wantMaxRange: []int{4, 6}, wantMonitorCalls: 100 * 4, }, { @@ -110,9 +110,19 @@ func TestLimiter(t *testing.T) { iterations: 200, delay: 1000 * time.Nanosecond, buckets: 1, - wantMax: 10, + wantMaxRange: []int{10, 10}, wantMonitorCalls: 0, }, + { + name: "wide-spread", + concurrency: 100, + maxConcurrency: 2, + delay: 100 * time.Nanosecond, + iterations: 40, + buckets: 50, + wantMaxRange: []int{30, 100}, + wantMonitorCalls: 100 * 40, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -125,8 +135,6 @@ func TestLimiter(t *testing.T) { // We know of an edge case that can lead to the rate limiter // occassionally letting one or two extra goroutines run // concurrently. - wantMaxUpperBound := tt.wantMax + 2 - for c := 0; c < tt.concurrency; c++ { go func() { @@ -136,15 +144,14 @@ func TestLimiter(t *testing.T) { limiter.Limit(context.Background(), lockKey, func() (interface{}, error) { gauge.up() - assert.True(t, gauge.current <= wantMaxUpperBound) - assert.True(t, len(limiter.semaphores) <= tt.buckets) + assert.True(t, gauge.current <= tt.wantMaxRange[1], "Expected the number of concurrent operations (%v) to not exceed the maximum concurrency (%v)", gauge.current, tt.wantMaxRange[1]) + assert.True(t, len(limiter.semaphores) <= tt.buckets, "Expected the number of semaphores (%v) to be lte number of buckets (%v)", len(limiter.semaphores), tt.buckets) time.Sleep(tt.delay) gauge.down() return nil, nil }) - // Add time.Sleep(tt.delay) } @@ -153,7 +160,7 @@ func TestLimiter(t *testing.T) { } wg.Wait() - assert.True(t, tt.wantMax <= gauge.max && gauge.max <= wantMaxUpperBound) + assert.True(t, tt.wantMaxRange[0] <= gauge.max && gauge.max <= tt.wantMaxRange[1], "Expected maximum concurrency to be in the range [%v,%v] but got %v", tt.wantMaxRange[0], tt.wantMaxRange[1], gauge.max) assert.Equal(t, 0, gauge.current) assert.Equal(t, 0, len(limiter.semaphores)) -- GitLab From 8118cfe2b642189d19bf81bf6a06068cfd3164dd Mon Sep 17 00:00:00 2001 From: Andrew Newdigate Date: Thu, 28 Sep 2017 17:43:02 +0100 Subject: [PATCH 14/19] Add changelog [skip ci] --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index cc4fe4af26..f0cbf3bf75 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,8 @@ UNRELEASED +- Allow individual endpoints to be rate-limited per-repository + https://gitlab.com/gitlab-org/gitaly/merge_requests/376 - Fix path bug in CommitService::FindCommits https://gitlab.com/gitlab-org/gitaly/merge_requests/364 - Fail harder during startup, fix version string -- GitLab From 697eb93d69d02c802a4ee245ab37e7e24dc43b0a Mon Sep 17 00:00:00 2001 From: Andrew Newdigate Date: Thu, 28 Sep 2017 18:42:12 +0100 Subject: [PATCH 15/19] Updated the test to avoid contention on the keys --- .../limithandler/concurrency_limiter_test.go | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/internal/middleware/limithandler/concurrency_limiter_test.go b/internal/middleware/limithandler/concurrency_limiter_test.go index e35ced2470..ff6c49f64f 100644 --- a/internal/middleware/limithandler/concurrency_limiter_test.go +++ b/internal/middleware/limithandler/concurrency_limiter_test.go @@ -1,7 +1,6 @@ package limithandler import ( - "fmt" "sync" "testing" "time" @@ -115,13 +114,13 @@ func TestLimiter(t *testing.T) { }, { name: "wide-spread", - concurrency: 100, + concurrency: 1000, maxConcurrency: 2, delay: 100 * time.Nanosecond, iterations: 40, buckets: 50, - wantMaxRange: []int{30, 100}, - wantMonitorCalls: 100 * 40, + wantMaxRange: []int{80, 120}, + wantMonitorCalls: 1000 * 40, }, } for _, tt := range tests { @@ -136,10 +135,9 @@ func TestLimiter(t *testing.T) { // occassionally letting one or two extra goroutines run // concurrently. for c := 0; c < tt.concurrency; c++ { - go func() { - + go func(counter int) { for i := 0; i < tt.iterations; i++ { - lockKey := fmt.Sprintf("key:%v", i%tt.buckets) + lockKey := (i ^ counter) % tt.buckets limiter.Limit(context.Background(), lockKey, func() (interface{}, error) { gauge.up() @@ -156,7 +154,7 @@ func TestLimiter(t *testing.T) { } wg.Done() - }() + }(c) } wg.Wait() -- GitLab From 69ff76ff39fa8d6ec2cd681324b6dfcd6064b12d Mon Sep 17 00:00:00 2001 From: Andrew Newdigate Date: Fri, 29 Sep 2017 10:23:50 +0100 Subject: [PATCH 16/19] Semaphore map keyed by string --- .../middleware/limithandler/concurrency_limiter.go | 10 +++++----- .../limithandler/concurrency_limiter_test.go | 3 ++- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/internal/middleware/limithandler/concurrency_limiter.go b/internal/middleware/limithandler/concurrency_limiter.go index 0cc86bbd73..f23f1d2e5d 100644 --- a/internal/middleware/limithandler/concurrency_limiter.go +++ b/internal/middleware/limithandler/concurrency_limiter.go @@ -25,14 +25,14 @@ type ConcurrencyLimiter struct { // When locking the locker requests 1 or more slots to be locked. // In this package, the number of slots is the number of concurrent requests the rate limiter lets through. // https://godoc.org/golang.org/x/sync/semaphore - semaphores map[interface{}]*semaphore.Weighted + semaphores map[string]*semaphore.Weighted max int64 mux *sync.Mutex monitor ConcurrencyMonitor } // Lazy create a semaphore for the given key -func (c *ConcurrencyLimiter) getSemaphore(lockKey interface{}) *semaphore.Weighted { +func (c *ConcurrencyLimiter) getSemaphore(lockKey string) *semaphore.Weighted { c.mux.Lock() defer c.mux.Unlock() @@ -46,7 +46,7 @@ func (c *ConcurrencyLimiter) getSemaphore(lockKey interface{}) *semaphore.Weight return w } -func (c *ConcurrencyLimiter) attemptCollection(lockKey interface{}) { +func (c *ConcurrencyLimiter) attemptCollection(lockKey string) { c.mux.Lock() defer c.mux.Unlock() @@ -68,7 +68,7 @@ func (c *ConcurrencyLimiter) attemptCollection(lockKey interface{}) { } // Limit will limit the concurrency of f -func (c *ConcurrencyLimiter) Limit(ctx context.Context, lockKey interface{}, f LimitedFunc) (interface{}, error) { +func (c *ConcurrencyLimiter) Limit(ctx context.Context, lockKey string, f LimitedFunc) (interface{}, error) { if c.max <= 0 { return f() } @@ -105,7 +105,7 @@ func NewLimiter(max int, monitor ConcurrencyMonitor) *ConcurrencyLimiter { } return &ConcurrencyLimiter{ - semaphores: make(map[interface{}]*semaphore.Weighted), + semaphores: make(map[string]*semaphore.Weighted), max: int64(max), mux: &sync.Mutex{}, monitor: monitor, diff --git a/internal/middleware/limithandler/concurrency_limiter_test.go b/internal/middleware/limithandler/concurrency_limiter_test.go index ff6c49f64f..f0141fe1bc 100644 --- a/internal/middleware/limithandler/concurrency_limiter_test.go +++ b/internal/middleware/limithandler/concurrency_limiter_test.go @@ -1,6 +1,7 @@ package limithandler import ( + "strconv" "sync" "testing" "time" @@ -137,7 +138,7 @@ func TestLimiter(t *testing.T) { for c := 0; c < tt.concurrency; c++ { go func(counter int) { for i := 0; i < tt.iterations; i++ { - lockKey := (i ^ counter) % tt.buckets + lockKey := strconv.Itoa((i ^ counter) % tt.buckets) limiter.Limit(context.Background(), lockKey, func() (interface{}, error) { gauge.up() -- GitLab From 7930606eeccf5977f81ead5e732a22845b20b449 Mon Sep 17 00:00:00 2001 From: Andrew Newdigate Date: Fri, 29 Sep 2017 10:36:55 +0100 Subject: [PATCH 17/19] Safe semaphore counts --- .../limithandler/concurrency_limiter.go | 7 ++++++ .../limithandler/concurrency_limiter_test.go | 24 ++++++++++--------- 2 files changed, 20 insertions(+), 11 deletions(-) diff --git a/internal/middleware/limithandler/concurrency_limiter.go b/internal/middleware/limithandler/concurrency_limiter.go index f23f1d2e5d..56c67ae89a 100644 --- a/internal/middleware/limithandler/concurrency_limiter.go +++ b/internal/middleware/limithandler/concurrency_limiter.go @@ -67,6 +67,13 @@ func (c *ConcurrencyLimiter) attemptCollection(lockKey string) { delete(c.semaphores, lockKey) } +func (c *ConcurrencyLimiter) countSemaphores() int { + c.mux.Lock() + defer c.mux.Unlock() + + return len(c.semaphores) +} + // Limit will limit the concurrency of f func (c *ConcurrencyLimiter) Limit(ctx context.Context, lockKey string, f LimitedFunc) (interface{}, error) { if c.max <= 0 { diff --git a/internal/middleware/limithandler/concurrency_limiter_test.go b/internal/middleware/limithandler/concurrency_limiter_test.go index f0141fe1bc..5f1c412b32 100644 --- a/internal/middleware/limithandler/concurrency_limiter_test.go +++ b/internal/middleware/limithandler/concurrency_limiter_test.go @@ -90,7 +90,7 @@ func TestLimiter(t *testing.T) { iterations: 10, delay: 1 * time.Millisecond, buckets: 1, - wantMaxRange: []int{2, 4}, + wantMaxRange: []int{2, 3}, wantMonitorCalls: 100 * 10, }, { @@ -100,7 +100,7 @@ func TestLimiter(t *testing.T) { delay: 1000 * time.Nanosecond, iterations: 4, buckets: 2, - wantMaxRange: []int{4, 6}, + wantMaxRange: []int{4, 5}, wantMonitorCalls: 100 * 4, }, { @@ -114,13 +114,15 @@ func TestLimiter(t *testing.T) { wantMonitorCalls: 0, }, { - name: "wide-spread", - concurrency: 1000, - maxConcurrency: 2, - delay: 100 * time.Nanosecond, - iterations: 40, - buckets: 50, - wantMaxRange: []int{80, 120}, + name: "wide-spread", + concurrency: 1000, + maxConcurrency: 2, + delay: 100 * time.Nanosecond, + iterations: 40, + buckets: 50, + // Intentionally leaving the max low because CI runners + // may struggle to do 80 things in parallel + wantMaxRange: []int{80, 102}, wantMonitorCalls: 1000 * 40, }, } @@ -144,7 +146,7 @@ func TestLimiter(t *testing.T) { gauge.up() assert.True(t, gauge.current <= tt.wantMaxRange[1], "Expected the number of concurrent operations (%v) to not exceed the maximum concurrency (%v)", gauge.current, tt.wantMaxRange[1]) - assert.True(t, len(limiter.semaphores) <= tt.buckets, "Expected the number of semaphores (%v) to be lte number of buckets (%v)", len(limiter.semaphores), tt.buckets) + assert.True(t, limiter.countSemaphores() <= tt.buckets, "Expected the number of semaphores (%v) to be lte number of buckets (%v)", len(limiter.semaphores), tt.buckets) time.Sleep(tt.delay) gauge.down() @@ -161,7 +163,7 @@ func TestLimiter(t *testing.T) { wg.Wait() assert.True(t, tt.wantMaxRange[0] <= gauge.max && gauge.max <= tt.wantMaxRange[1], "Expected maximum concurrency to be in the range [%v,%v] but got %v", tt.wantMaxRange[0], tt.wantMaxRange[1], gauge.max) assert.Equal(t, 0, gauge.current) - assert.Equal(t, 0, len(limiter.semaphores)) + assert.Equal(t, 0, limiter.countSemaphores()) assert.Equal(t, tt.wantMonitorCalls, gauge.enter) assert.Equal(t, tt.wantMonitorCalls, gauge.exit) -- GitLab From 1279ae966e234fd5d44a8b8505f0f33c09120eb8 Mon Sep 17 00:00:00 2001 From: Andrew Newdigate Date: Fri, 29 Sep 2017 11:08:20 +0100 Subject: [PATCH 18/19] Fixed unlock field access --- internal/middleware/limithandler/concurrency_limiter_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/middleware/limithandler/concurrency_limiter_test.go b/internal/middleware/limithandler/concurrency_limiter_test.go index 5f1c412b32..6f71613232 100644 --- a/internal/middleware/limithandler/concurrency_limiter_test.go +++ b/internal/middleware/limithandler/concurrency_limiter_test.go @@ -146,7 +146,7 @@ func TestLimiter(t *testing.T) { gauge.up() assert.True(t, gauge.current <= tt.wantMaxRange[1], "Expected the number of concurrent operations (%v) to not exceed the maximum concurrency (%v)", gauge.current, tt.wantMaxRange[1]) - assert.True(t, limiter.countSemaphores() <= tt.buckets, "Expected the number of semaphores (%v) to be lte number of buckets (%v)", len(limiter.semaphores), tt.buckets) + assert.True(t, limiter.countSemaphores() <= tt.buckets, "Expected the number of semaphores (%v) to be lte number of buckets (%v)", limiter.countSemaphores(), tt.buckets) time.Sleep(tt.delay) gauge.down() -- GitLab From 7d10989362b82ed7e0bed349a97e255072f663aa Mon Sep 17 00:00:00 2001 From: Andrew Newdigate Date: Fri, 29 Sep 2017 11:11:12 +0100 Subject: [PATCH 19/19] Less confusing count of monitor calls --- .../limithandler/concurrency_limiter_test.go | 27 ++++++++++++------- 1 file changed, 17 insertions(+), 10 deletions(-) diff --git a/internal/middleware/limithandler/concurrency_limiter_test.go b/internal/middleware/limithandler/concurrency_limiter_test.go index 6f71613232..f045c2c312 100644 --- a/internal/middleware/limithandler/concurrency_limiter_test.go +++ b/internal/middleware/limithandler/concurrency_limiter_test.go @@ -71,7 +71,7 @@ func TestLimiter(t *testing.T) { delay time.Duration buckets int wantMaxRange []int - wantMonitorCalls int + wantMonitorCalls bool }{ { name: "single", @@ -81,7 +81,7 @@ func TestLimiter(t *testing.T) { delay: 1 * time.Millisecond, buckets: 1, wantMaxRange: []int{1, 1}, - wantMonitorCalls: 1, + wantMonitorCalls: true, }, { name: "two-at-a-time", @@ -91,7 +91,7 @@ func TestLimiter(t *testing.T) { delay: 1 * time.Millisecond, buckets: 1, wantMaxRange: []int{2, 3}, - wantMonitorCalls: 100 * 10, + wantMonitorCalls: true, }, { name: "two-by-two", @@ -101,7 +101,7 @@ func TestLimiter(t *testing.T) { iterations: 4, buckets: 2, wantMaxRange: []int{4, 5}, - wantMonitorCalls: 100 * 4, + wantMonitorCalls: true, }, { name: "no-limit", @@ -111,7 +111,7 @@ func TestLimiter(t *testing.T) { delay: 1000 * time.Nanosecond, buckets: 1, wantMaxRange: []int{10, 10}, - wantMonitorCalls: 0, + wantMonitorCalls: false, }, { name: "wide-spread", @@ -123,7 +123,7 @@ func TestLimiter(t *testing.T) { // Intentionally leaving the max low because CI runners // may struggle to do 80 things in parallel wantMaxRange: []int{80, 102}, - wantMonitorCalls: 1000 * 40, + wantMonitorCalls: true, }, } for _, tt := range tests { @@ -165,10 +165,17 @@ func TestLimiter(t *testing.T) { assert.Equal(t, 0, gauge.current) assert.Equal(t, 0, limiter.countSemaphores()) - assert.Equal(t, tt.wantMonitorCalls, gauge.enter) - assert.Equal(t, tt.wantMonitorCalls, gauge.exit) - assert.Equal(t, tt.wantMonitorCalls, gauge.queued) - assert.Equal(t, tt.wantMonitorCalls, gauge.dequeued) + var wantMonitorCallCount int + if tt.wantMonitorCalls { + wantMonitorCallCount = tt.concurrency * tt.iterations + } else { + wantMonitorCallCount = 0 + } + + assert.Equal(t, wantMonitorCallCount, gauge.enter) + assert.Equal(t, wantMonitorCallCount, gauge.exit) + assert.Equal(t, wantMonitorCallCount, gauge.queued) + assert.Equal(t, wantMonitorCallCount, gauge.dequeued) }) } } -- GitLab