From bddecd1c6984b2ed3fbc666c64b4e32d2e531889 Mon Sep 17 00:00:00 2001 From: Henri Philipps Date: Sun, 10 Mar 2019 16:29:39 +0100 Subject: [PATCH 1/2] add circuit breaker package to labkit --- circuitbreaker/doc.go | 27 ++ circuitbreaker/examples_test.go | 87 +++++ circuitbreaker/executor/executor.go | 28 ++ circuitbreaker/executor/options.go | 80 ++++ circuitbreaker/failer.go | 17 + circuitbreaker/options.go | 47 +++ circuitbreaker/roundtripper.go | 43 +++ circuitbreaker/roundtripper_test.go | 157 ++++++++ go.mod | 1 + go.sum | 2 + vendor/github.com/sony/gobreaker/.travis.yml | 15 + vendor/github.com/sony/gobreaker/LICENSE | 21 ++ vendor/github.com/sony/gobreaker/README.md | 128 +++++++ vendor/github.com/sony/gobreaker/gobreaker.go | 344 ++++++++++++++++++ vendor/modules.txt | 2 + 15 files changed, 999 insertions(+) create mode 100644 circuitbreaker/doc.go create mode 100644 circuitbreaker/examples_test.go create mode 100644 circuitbreaker/executor/executor.go create mode 100644 circuitbreaker/executor/options.go create mode 100644 circuitbreaker/failer.go create mode 100644 circuitbreaker/options.go create mode 100644 circuitbreaker/roundtripper.go create mode 100644 circuitbreaker/roundtripper_test.go create mode 100644 vendor/github.com/sony/gobreaker/.travis.yml create mode 100644 vendor/github.com/sony/gobreaker/LICENSE create mode 100644 vendor/github.com/sony/gobreaker/README.md create mode 100644 vendor/github.com/sony/gobreaker/gobreaker.go diff --git a/circuitbreaker/doc.go b/circuitbreaker/doc.go new file mode 100644 index 0000000..8040981 --- /dev/null +++ b/circuitbreaker/doc.go @@ -0,0 +1,27 @@ +/* +Package circuitbreaker is providing circuit breaker functionality for labkit. + +A circuit breaker is useful to protect backends from thundering herds in case of failures +or high load by failing fast and creating backpressure on the clients. By failing fast we +also can prevent cascading failures caused by thread pool exhaustion on the clients. + +This package is providing a http.Roundtripper middleware 'NewRoundTripper()' which wraps a delegate +RoundTripper with a circuit breaker. + +It only provides two simple public interfaces and hides implementation-specific APIs of the underlying +circuit breaker: + +Executor - which controls the execution of a request with a circuit breaker + +Failer - which determines if a request succeeded or failed + +The underlying circuit breaker (currently github.com/sony/gobreaker) can be configured using functional +options (executor.Option). + +The default circuit breaker will treat all status codes >= 400 as failure, trips after 5 consecutive +failures and returns into half-open state after 10s. + +Carefully test circuit breaker settings before aplying them in production - they can have big and +surprising impacts on the behavior of the whole system under different load situations! +*/ +package circuitbreaker diff --git a/circuitbreaker/examples_test.go b/circuitbreaker/examples_test.go new file mode 100644 index 0000000..a13637d --- /dev/null +++ b/circuitbreaker/examples_test.go @@ -0,0 +1,87 @@ +package circuitbreaker_test + +import ( + "errors" + "fmt" + "log" + "net/http" + "time" + + "gitlab.com/gitlab-org/labkit/circuitbreaker" + "gitlab.com/gitlab-org/labkit/circuitbreaker/executor" +) + +func Example() { + + /** The short version: **/ + + tr := &http.Transport{} + client := &http.Client{Transport: circuitbreaker.NewRoundTripper(tr)} + + /** The long version: **/ + + // Initialize executor showing all available options. + // Using the default values - providing no options would have the same result. + ex := executor.New( + executor.WithName(""), + executor.WithMaxRequests(1), + executor.WithResetInterval(time.Duration(0)), + executor.WithTimeout(10*time.Second), + executor.WithTripFunc(nil), + executor.WithStateChangeFunc(nil), + ) + + // A TripFunc that trips after 10 consecutive errors could look like this + // (see github.com/sony/gobreaker for details on Counts): + // func(c gobreaker.Counts) bool {return c.ConsecutiveFailures > 10} + + // A StateChangeFunc could look like this: + // func(name string, from gobreaker.State, to gobreaker.State) {log.Println(name, from, to)} + + // Create delegate transport + tr = &http.Transport{ + MaxIdleConns: 10, + IdleConnTimeout: 30 * time.Second, + DisableCompression: true, + } + + // Create http client wrapping the transport with a cicuit breaker. + // Leaving out all options would lead to a circuit breaker with default values. + client = &http.Client{ + Transport: circuitbreaker.NewRoundTripper( + tr, + circuitbreaker.WithExecutor(ex), + circuitbreaker.WithFailer(func(res *http.Response) error { + if res.StatusCode >= 400 { + return errors.New("failed") + } + return nil + }), + ), + } + + /** Do something with the client **/ + + // we trigger the circuit breaker after 5 failures. + req, _ := http.NewRequest("GET", "http://non-existing-domain.gitlab", nil) + + for i := 0; i <= 5; i++ { + res, err := client.Do(req) + if err == nil { + log.Panicf("expected request to fail, got response: %v", res) + } + } + + // This request should be returned by the open circuit breaker. + res, err := client.Do(req) + if err == nil { + log.Panicf("expected request to fail, got response: %v", res) + } + if got, want := err.Error(), "Get http://non-existing-domain.gitlab: circuit breaker is open"; want != got { + log.Panicf("expected %s, got %s", want, got) + } + + fmt.Println("We tripped the circuit breaker") + + // Output: We tripped the circuit breaker +} diff --git a/circuitbreaker/executor/executor.go b/circuitbreaker/executor/executor.go new file mode 100644 index 0000000..91c7fb7 --- /dev/null +++ b/circuitbreaker/executor/executor.go @@ -0,0 +1,28 @@ +// Package executor is using github.com/sony/gobreaker as underlying circuit breaker implementation +// but is using the Executor interface as a facade to provide a stable API to clients if we should decide +// to use a different circuit breaker package in the future. +package executor + +import "github.com/sony/gobreaker" + +// Executor is an interface that should be able to match most circuit breaker implementations. +type Executor interface { + Execute(func() (interface{}, error)) (interface{}, error) +} + +// New is returning a new executor. +func New(opts ...Option) Executor { + + config := applyOptions(opts) + + settings := gobreaker.Settings{ + Name: config.name, + MaxRequests: config.halfOpenMaxRequests, + Interval: config.resetInterval, + Timeout: config.timeout, + ReadyToTrip: config.readyToTrip, + OnStateChange: config.onStateChange, + } + + return gobreaker.NewCircuitBreaker(settings) +} diff --git a/circuitbreaker/executor/options.go b/circuitbreaker/executor/options.go new file mode 100644 index 0000000..79d9779 --- /dev/null +++ b/circuitbreaker/executor/options.go @@ -0,0 +1,80 @@ +package executor + +import ( + "time" + + "github.com/sony/gobreaker" +) + +// The configuration for the circuit breaker. +type config struct { + name string + halfOpenMaxRequests uint32 + resetInterval time.Duration + timeout time.Duration + readyToTrip func(counts gobreaker.Counts) bool + onStateChange func(name string, from gobreaker.State, to gobreaker.State) +} + +// Option will configure a circuit breaker Executor. +type Option func(*config) + +func applyOptions(opts []Option) config { + + config := config{timeout: 10 * time.Second} + + for _, v := range opts { + v(&config) + } + + return config +} + +// WithName is applying a name to the circuit breaker. +// It might appear in logs when the breaker is tripped. +func WithName(name string) Option { + return func(config *config) { + config.name = name + } +} + +// WithMaxRequests is setting how many probe requests are allowed to pass through in +// half-open state. By default, the circuit breaker allows only 1 request. +func WithMaxRequests(maxRequests uint32) Option { + return func(config *config) { + config.halfOpenMaxRequests = maxRequests + } +} + +// WithResetInterval is setting the cyclic period of the closed state for the circuit breaker to +// clear the internal counts. +// By default the circuit breaker doesn't clear internal counts during the closed state. +func WithResetInterval(interval time.Duration) Option { + return func(config *config) { + config.resetInterval = interval + } +} + +// WithTimeout is setting the period of the open state, after which the state of the circuit breaker +// becomes half-open. By default, the timeout value of the circuit breaker is set to 10 seconds. +func WithTimeout(timeout time.Duration) Option { + return func(config *config) { + config.timeout = timeout + } +} + +// WithTripFunc is setting the func that will trip state changes based on Counts. +// The default trip func returns true after 5 consecutive failures. +func WithTripFunc(tf func(counts gobreaker.Counts) bool) Option { + return func(config *config) { + config.readyToTrip = tf + } +} + +// WithStateChangeFunc is setting the func that will be executed on state changes. +// Default is nil. +func WithStateChangeFunc(sf func(name string, from gobreaker.State, to gobreaker.State)) Option { + return func(config *config) { + config.onStateChange = sf + } +} diff --git a/circuitbreaker/failer.go b/circuitbreaker/failer.go new file mode 100644 index 0000000..8bcf40e --- /dev/null +++ b/circuitbreaker/failer.go @@ -0,0 +1,17 @@ +package circuitbreaker + +import ( + "fmt" + "net/http" +) + +// FailerFunc is determining if a response is treated as a failure by the circuit breaker. +type FailerFunc func(response *http.Response) error + +// defaultFailerFunc is treating all status codes above 400 as failure. +func defaultFailerFunc(resp *http.Response) error { + if code := resp.StatusCode; code >= 400 { + return fmt.Errorf("got status code %d - %s", code, resp.Status) + } + return nil +} diff --git a/circuitbreaker/options.go b/circuitbreaker/options.go new file mode 100644 index 0000000..49a9a6b --- /dev/null +++ b/circuitbreaker/options.go @@ -0,0 +1,47 @@ +package circuitbreaker + +import ( + "gitlab.com/gitlab-org/labkit/circuitbreaker/executor" +) + +// The configuration for the circuit breaker. +type config struct { + executor executor.Executor + failer FailerFunc +} + +func newConfig() config { + return config{ + executor: executor.New(), + failer: defaultFailerFunc, + } +} + +// Option will configure a circuit breaker. +type Option func(*config) + +func applyOptions(opts []Option) config { + config := newConfig() + for _, v := range opts { + v(&config) + } + + return config +} + +// WithExecutor is setting the Executor wrapping a circuit breaker around a function to be executed. +// The default executor is a circuit breaker which trips after more than 5 consecutive failures +// and stays open for 10s. +func WithExecutor(e executor.Executor) Option { + return func(config *config) { + config.executor = e + } +} + +// WithFailer is setting the FailerFunc which decides if a response is treated as a failure by the +// circuit breaker. By default all status codes equal or greater than 400 are treated as failure. +func WithFailer(f FailerFunc) Option { + return func(config *config) { + config.failer = f + } +} diff --git a/circuitbreaker/roundtripper.go b/circuitbreaker/roundtripper.go new file mode 100644 index 0000000..b570887 --- /dev/null +++ b/circuitbreaker/roundtripper.go @@ -0,0 +1,43 @@ +package circuitbreaker + +import ( + "net/http" +) + +// roundTripper is implementing http.RoundTripper and wrapping a delegate RoundTripper +// with a circuit breaker. +type roundTripper struct { + delegate http.RoundTripper + cfg config +} + +// RoundTrip is executing a request through a circuit breaker. +func (r roundTripper) RoundTrip(req *http.Request) (res *http.Response, err error) { + + response, executeErr := r.cfg.executor.Execute(func() (interface{}, error) { + + roundtripRes, roundTripErr := r.delegate.RoundTrip(req) + if roundTripErr != nil { + return nil, roundTripErr + } + + // we got a roundtrip and use failer to check if the response should be treated as failure + return roundtripRes, r.cfg.failer(roundtripRes) + }) + + if response != nil { + // if the response isn't nil we got a successful roundtrip, but need to ignore + // possible executor errors that are caused by 4xx or 5xx responses + return response.(*http.Response), nil + } + + return nil, executeErr +} + +// NewRoundTripper is creating a RoundTripper which is wrapping a circuit breaker around +// the delegate RoundTripper. +func NewRoundTripper(delegate http.RoundTripper, opts ...Option) http.RoundTripper { + config := applyOptions(opts) + + return &roundTripper{delegate: delegate, cfg: config} +} diff --git a/circuitbreaker/roundtripper_test.go b/circuitbreaker/roundtripper_test.go new file mode 100644 index 0000000..6428fef --- /dev/null +++ b/circuitbreaker/roundtripper_test.go @@ -0,0 +1,157 @@ +package circuitbreaker + +import ( + "errors" + "net/http" + "testing" + "time" + + "gitlab.com/gitlab-org/labkit/circuitbreaker/executor" +) + +type delegatedRoundTripper struct { + delegate func(req *http.Request) (*http.Response, error) +} + +func (c delegatedRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) { + return c.delegate(req) +} + +func roundTripperFunc(delegate func(req *http.Request) (*http.Response, error)) http.RoundTripper { + return &delegatedRoundTripper{delegate} +} + +func TestSucceedingRoundTripper(t *testing.T) { + mockTransport := roundTripperFunc(func(req *http.Request) (*http.Response, error) { + return &http.Response{StatusCode: 200}, nil + }) + + client := &http.Client{ + Transport: NewRoundTripper(mockTransport, WithExecutor(executor.New())), + } + + req, err := http.NewRequest("GET", "http://example.com", nil) + if err != nil { + t.Fatal(err) + } + + for i := 0; i <= 10; i++ { + res, err := client.Do(req) + if err != nil { + t.Fatalf("expected request to succeed, got %s, response: %v", err, res) + } + } +} + +func TestStatusCode400RoundTripper(t *testing.T) { + + // time for waiting in open state before passing through new requests in half-open state + timeout := 200 * time.Millisecond + + mockTransport := roundTripperFunc(func(req *http.Request) (*http.Response, error) { + return &http.Response{StatusCode: 400}, nil + }) + + client := &http.Client{ + Transport: NewRoundTripper(mockTransport, WithExecutor(executor.New(executor.WithTimeout(timeout)))), + } + + req, err := http.NewRequest("GET", "http://example.com", nil) + if err != nil { + t.Fatal(err) + } + + for i := 0; i <= 5; i++ { + res, err := client.Do(req) + if err != nil { + t.Fatalf("expected request to succeed with status code 400, got %s, response: %v", err, res) + } + } + + // next request should be rejected by an open circuit breaker + res, err := client.Do(req) + if err == nil { + t.Fatalf("expected request to fail because of open circuit breaker, got %v", res) + } + if got, want := err.Error(), "Get http://example.com: circuit breaker is open"; want != got { + t.Fatalf("Expected error string %s, got %s", want, got) + } + + // wait for circuit breaker to become half-open + time.Sleep(timeout) + + // next request should be passed through because circuit breaker is half-open again + res, err = client.Do(req) + if err != nil { + t.Fatalf("expected request to succeed with status code 400, got %s, response: %v", err, res) + } + + // next request should be rejected by an open circuit breaker again because the previous one failed with 400 + res, err = client.Do(req) + if err == nil { + t.Fatalf("expected request to fail because of open circuit breaker, got %v", res) + } + if got, want := err.Error(), "Get http://example.com: circuit breaker is open"; want != got { + t.Fatalf("Expected error string %s, got %s", want, got) + } +} + +func TestFailingRoundTripper(t *testing.T) { + + // time for waiting in open state before passing through new requests in half-open state + timeout := 200 * time.Millisecond + + mockTransport := roundTripperFunc(func(req *http.Request) (*http.Response, error) { + return nil, errors.New("injected error") + }) + + client := &http.Client{ + Transport: NewRoundTripper(mockTransport, WithExecutor(executor.New(executor.WithTimeout(timeout)))), + } + + req, err := http.NewRequest("GET", "http://example.com", nil) + if err != nil { + t.Fatal(err) + } + + for i := 0; i <= 5; i++ { + res, err := client.Do(req) + if err == nil { + t.Fatalf("expected request to fail, got response: %v", res) + } + if got, want := err.Error(), "Get http://example.com: injected error"; want != got { + t.Fatalf("Expected error string %s, got %s", want, got) + } + + } + + // next request should be rejected by an open circuit breaker because of the 5 injected errors before + res, err := client.Do(req) + if err == nil { + t.Fatalf("expected request to fail because of open circuit breaker, got %v", res) + } + if got, want := err.Error(), "Get http://example.com: circuit breaker is open"; want != got { + t.Fatalf("Expected error string %s, got %s", want, got) + } + + // wait for circuit breaker to become half-open + time.Sleep(timeout) + + // next request should be passed through and fail again with injected error because circuit breaker was half-open + res, err = client.Do(req) + if err == nil { + t.Fatalf("expected request to fail, got response: %v", res) + } + if got, want := err.Error(), "Get http://example.com: injected error"; want != got { + t.Fatalf("Expected error string %s, got %s", want, got) + } + + // next request should be rejected by an open circuit breaker again because of the previous injected error + res, err = client.Do(req) + if err == nil { + t.Fatalf("expected request to fail because of open circuit breaker, got %v", res) + } + if got, want := err.Error(), "Get http://example.com: circuit breaker is open"; want != got { + t.Fatalf("Expected error string %s, got %s", want, got) + } +} diff --git a/go.mod b/go.mod index d8ed5d8..bd336c3 100644 --- a/go.mod +++ b/go.mod @@ -14,6 +14,7 @@ require ( github.com/philhofer/fwd v1.0.0 // indirect github.com/pkg/errors v0.8.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/sony/gobreaker v0.0.0-20181109014844-d928aaea92e1 github.com/stretchr/testify v1.2.2 github.com/tinylib/msgp v1.0.2 // indirect github.com/uber-go/atomic v1.3.2 // indirect diff --git a/go.sum b/go.sum index 73df863..9d345e1 100644 --- a/go.sum +++ b/go.sum @@ -38,6 +38,8 @@ github.com/pkg/errors v0.8.0 h1:WdK/asTD0HN+q6hsWO3/vpuAkAr+tw6aNJNDFFf0+qw= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/sony/gobreaker v0.0.0-20181109014844-d928aaea92e1 h1:D3g/qSqU1B7J3UsokdkjF60a43Lx5Zl8ifAjMFDjEbg= +github.com/sony/gobreaker v0.0.0-20181109014844-d928aaea92e1/go.mod h1:XvpJiTD8NibaH7z0NzyfhR1+NQDtR9F/x92xheTwC9k= github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/tinylib/msgp v1.0.2 h1:DfdQrzQa7Yh2es9SuLkixqxuXS2SxsdYn0KbdrOGWD8= diff --git a/vendor/github.com/sony/gobreaker/.travis.yml b/vendor/github.com/sony/gobreaker/.travis.yml new file mode 100644 index 0000000..d76436b --- /dev/null +++ b/vendor/github.com/sony/gobreaker/.travis.yml @@ -0,0 +1,15 @@ +language: go +go: + - 1.9.x + - 1.10.x + - 1.11.x +sudo: false +before_install: + - go get -u golang.org/x/lint/golint + - go get github.com/axw/gocov/gocov + - go get github.com/mattn/goveralls +script: + - test -z "`gofmt -l .`" + - test -z "`golint ./...`" + - $GOPATH/bin/goveralls -service=travis-ci + - cd example && go build -o http_breaker && ./http_breaker diff --git a/vendor/github.com/sony/gobreaker/LICENSE b/vendor/github.com/sony/gobreaker/LICENSE new file mode 100644 index 0000000..81795bf --- /dev/null +++ b/vendor/github.com/sony/gobreaker/LICENSE @@ -0,0 +1,21 @@ +The MIT License (MIT) + +Copyright 2015 Sony Corporation + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. diff --git a/vendor/github.com/sony/gobreaker/README.md b/vendor/github.com/sony/gobreaker/README.md new file mode 100644 index 0000000..ce7d7a7 --- /dev/null +++ b/vendor/github.com/sony/gobreaker/README.md @@ -0,0 +1,128 @@ +gobreaker +========= + +[![GoDoc](https://godoc.org/github.com/sony/gobreaker?status.svg)](http://godoc.org/github.com/sony/gobreaker) +[![Build Status](https://travis-ci.org/sony/gobreaker.svg?branch=master)](https://travis-ci.org/sony/gobreaker) +[![Coverage Status](https://coveralls.io/repos/sony/gobreaker/badge.svg?branch=master&service=github)](https://coveralls.io/github/sony/gobreaker?branch=master) + +[gobreaker][repo-url] implements the [Circuit Breaker pattern](https://msdn.microsoft.com/en-us/library/dn589784.aspx) in Go. + +Installation +------------ + +``` +go get github.com/sony/gobreaker +``` + +Usage +----- + +The struct `CircuitBreaker` is a state machine to prevent sending requests that are likely to fail. +The function `NewCircuitBreaker` creates a new `CircuitBreaker`. + +```go +func NewCircuitBreaker(st Settings) *CircuitBreaker +``` + +You can configure `CircuitBreaker` by the struct `Settings`: + +```go +type Settings struct { + Name string + MaxRequests uint32 + Interval time.Duration + Timeout time.Duration + ReadyToTrip func(counts Counts) bool + OnStateChange func(name string, from State, to State) +} +``` + +- `Name` is the name of the `CircuitBreaker`. + +- `MaxRequests` is the maximum number of requests allowed to pass through + when the `CircuitBreaker` is half-open. + If `MaxRequests` is 0, `CircuitBreaker` allows only 1 request. + +- `Interval` is the cyclic period of the closed state + for `CircuitBreaker` to clear the internal `Counts`, described later in this section. + If `Interval` is 0, `CircuitBreaker` doesn't clear the internal `Counts` during the closed state. + +- `Timeout` is the period of the open state, + after which the state of `CircuitBreaker` becomes half-open. + If `Timeout` is 0, the timeout value of `CircuitBreaker` is set to 60 seconds. + +- `ReadyToTrip` is called with a copy of `Counts` whenever a request fails in the closed state. + If `ReadyToTrip` returns true, `CircuitBreaker` will be placed into the open state. + If `ReadyToTrip` is `nil`, default `ReadyToTrip` is used. + Default `ReadyToTrip` returns true when the number of consecutive failures is more than 5. + +- `OnStateChange` is called whenever the state of `CircuitBreaker` changes. + +The struct `Counts` holds the numbers of requests and their successes/failures: + +```go +type Counts struct { + Requests uint32 + TotalSuccesses uint32 + TotalFailures uint32 + ConsecutiveSuccesses uint32 + ConsecutiveFailures uint32 +} +``` + +`CircuitBreaker` clears the internal `Counts` either +on the change of the state or at the closed-state intervals. +`Counts` ignores the results of the requests sent before clearing. + +`CircuitBreaker` can wrap any function to send a request: + +```go +func (cb *CircuitBreaker) Execute(req func() (interface{}, error)) (interface{}, error) +``` + +The method `Execute` runs the given request if `CircuitBreaker` accepts it. +`Execute` returns an error instantly if `CircuitBreaker` rejects the request. +Otherwise, `Execute` returns the result of the request. +If a panic occurs in the request, `CircuitBreaker` handles it as an error +and causes the same panic again. + +Example +------- + +```go +var cb *breaker.CircuitBreaker + +func Get(url string) ([]byte, error) { + body, err := cb.Execute(func() (interface{}, error) { + resp, err := http.Get(url) + if err != nil { + return nil, err + } + + defer resp.Body.Close() + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + return nil, err + } + + return body, nil + }) + if err != nil { + return nil, err + } + + return body.([]byte), nil +} +``` + +See [example](https://github.com/sony/gobreaker/blob/master/example) for details. + +License +------- + +The MIT License (MIT) + +See [LICENSE](https://github.com/sony/gobreaker/blob/master/LICENSE) for details. + + +[repo-url]: https://github.com/sony/gobreaker diff --git a/vendor/github.com/sony/gobreaker/gobreaker.go b/vendor/github.com/sony/gobreaker/gobreaker.go new file mode 100644 index 0000000..faea658 --- /dev/null +++ b/vendor/github.com/sony/gobreaker/gobreaker.go @@ -0,0 +1,344 @@ +// Package gobreaker implements the Circuit Breaker pattern. +// See https://msdn.microsoft.com/en-us/library/dn589784.aspx. +package gobreaker + +import ( + "errors" + "fmt" + "sync" + "time" +) + +// State is a type that represents a state of CircuitBreaker. +type State int + +// These constants are states of CircuitBreaker. +const ( + StateClosed State = iota + StateHalfOpen + StateOpen +) + +var ( + // ErrTooManyRequests is returned when the CB state is half open and the requests count is over the cb maxRequests + ErrTooManyRequests = errors.New("too many requests") + // ErrOpenState is returned when the CB state is open + ErrOpenState = errors.New("circuit breaker is open") +) + +// String implements stringer interface. +func (s State) String() string { + switch s { + case StateClosed: + return "closed" + case StateHalfOpen: + return "half-open" + case StateOpen: + return "open" + default: + return fmt.Sprintf("unknown state: %d", s) + } +} + +// Counts holds the numbers of requests and their successes/failures. +// CircuitBreaker clears the internal Counts either +// on the change of the state or at the closed-state intervals. +// Counts ignores the results of the requests sent before clearing. +type Counts struct { + Requests uint32 + TotalSuccesses uint32 + TotalFailures uint32 + ConsecutiveSuccesses uint32 + ConsecutiveFailures uint32 +} + +func (c *Counts) onRequest() { + c.Requests++ +} + +func (c *Counts) onSuccess() { + c.TotalSuccesses++ + c.ConsecutiveSuccesses++ + c.ConsecutiveFailures = 0 +} + +func (c *Counts) onFailure() { + c.TotalFailures++ + c.ConsecutiveFailures++ + c.ConsecutiveSuccesses = 0 +} + +func (c *Counts) clear() { + c.Requests = 0 + c.TotalSuccesses = 0 + c.TotalFailures = 0 + c.ConsecutiveSuccesses = 0 + c.ConsecutiveFailures = 0 +} + +// Settings configures CircuitBreaker: +// +// Name is the name of the CircuitBreaker. +// +// MaxRequests is the maximum number of requests allowed to pass through +// when the CircuitBreaker is half-open. +// If MaxRequests is 0, the CircuitBreaker allows only 1 request. +// +// Interval is the cyclic period of the closed state +// for the CircuitBreaker to clear the internal Counts. +// If Interval is 0, the CircuitBreaker doesn't clear internal Counts during the closed state. +// +// Timeout is the period of the open state, +// after which the state of the CircuitBreaker becomes half-open. +// If Timeout is 0, the timeout value of the CircuitBreaker is set to 60 seconds. +// +// ReadyToTrip is called with a copy of Counts whenever a request fails in the closed state. +// If ReadyToTrip returns true, the CircuitBreaker will be placed into the open state. +// If ReadyToTrip is nil, default ReadyToTrip is used. +// Default ReadyToTrip returns true when the number of consecutive failures is more than 5. +// +// OnStateChange is called whenever the state of the CircuitBreaker changes. +type Settings struct { + Name string + MaxRequests uint32 + Interval time.Duration + Timeout time.Duration + ReadyToTrip func(counts Counts) bool + OnStateChange func(name string, from State, to State) +} + +// CircuitBreaker is a state machine to prevent sending requests that are likely to fail. +type CircuitBreaker struct { + name string + maxRequests uint32 + interval time.Duration + timeout time.Duration + readyToTrip func(counts Counts) bool + onStateChange func(name string, from State, to State) + + mutex sync.Mutex + state State + generation uint64 + counts Counts + expiry time.Time +} + +// TwoStepCircuitBreaker is like CircuitBreaker but instead of surrounding a function +// with the breaker functionality, it only checks whether a request can proceed and +// expects the caller to report the outcome in a separate step using a callback. +type TwoStepCircuitBreaker struct { + cb *CircuitBreaker +} + +// NewCircuitBreaker returns a new CircuitBreaker configured with the given Settings. +func NewCircuitBreaker(st Settings) *CircuitBreaker { + cb := new(CircuitBreaker) + + cb.name = st.Name + cb.interval = st.Interval + cb.onStateChange = st.OnStateChange + + if st.MaxRequests == 0 { + cb.maxRequests = 1 + } else { + cb.maxRequests = st.MaxRequests + } + + if st.Timeout == 0 { + cb.timeout = defaultTimeout + } else { + cb.timeout = st.Timeout + } + + if st.ReadyToTrip == nil { + cb.readyToTrip = defaultReadyToTrip + } else { + cb.readyToTrip = st.ReadyToTrip + } + + cb.toNewGeneration(time.Now()) + + return cb +} + +// NewTwoStepCircuitBreaker returns a new TwoStepCircuitBreaker configured with the given Settings. +func NewTwoStepCircuitBreaker(st Settings) *TwoStepCircuitBreaker { + return &TwoStepCircuitBreaker{ + cb: NewCircuitBreaker(st), + } +} + +const defaultTimeout = time.Duration(60) * time.Second + +func defaultReadyToTrip(counts Counts) bool { + return counts.ConsecutiveFailures > 5 +} + +// Name returns the name of the CircuitBreaker. +func (cb *CircuitBreaker) Name() string { + return cb.name +} + +// State returns the current state of the CircuitBreaker. +func (cb *CircuitBreaker) State() State { + cb.mutex.Lock() + defer cb.mutex.Unlock() + + now := time.Now() + state, _ := cb.currentState(now) + return state +} + +// Execute runs the given request if the CircuitBreaker accepts it. +// Execute returns an error instantly if the CircuitBreaker rejects the request. +// Otherwise, Execute returns the result of the request. +// If a panic occurs in the request, the CircuitBreaker handles it as an error +// and causes the same panic again. +func (cb *CircuitBreaker) Execute(req func() (interface{}, error)) (interface{}, error) { + generation, err := cb.beforeRequest() + if err != nil { + return nil, err + } + + defer func() { + e := recover() + if e != nil { + cb.afterRequest(generation, false) + panic(e) + } + }() + + result, err := req() + cb.afterRequest(generation, err == nil) + return result, err +} + +// Name returns the name of the TwoStepCircuitBreaker. +func (tscb *TwoStepCircuitBreaker) Name() string { + return tscb.cb.Name() +} + +// State returns the current state of the TwoStepCircuitBreaker. +func (tscb *TwoStepCircuitBreaker) State() State { + return tscb.cb.State() +} + +// Allow checks if a new request can proceed. It returns a callback that should be used to +// register the success or failure in a separate step. If the circuit breaker doesn't allow +// requests, it returns an error. +func (tscb *TwoStepCircuitBreaker) Allow() (done func(success bool), err error) { + generation, err := tscb.cb.beforeRequest() + if err != nil { + return nil, err + } + + return func(success bool) { + tscb.cb.afterRequest(generation, success) + }, nil +} + +func (cb *CircuitBreaker) beforeRequest() (uint64, error) { + cb.mutex.Lock() + defer cb.mutex.Unlock() + + now := time.Now() + state, generation := cb.currentState(now) + + if state == StateOpen { + return generation, ErrOpenState + } else if state == StateHalfOpen && cb.counts.Requests >= cb.maxRequests { + return generation, ErrTooManyRequests + } + + cb.counts.onRequest() + return generation, nil +} + +func (cb *CircuitBreaker) afterRequest(before uint64, success bool) { + cb.mutex.Lock() + defer cb.mutex.Unlock() + + now := time.Now() + state, generation := cb.currentState(now) + if generation != before { + return + } + + if success { + cb.onSuccess(state, now) + } else { + cb.onFailure(state, now) + } +} + +func (cb *CircuitBreaker) onSuccess(state State, now time.Time) { + switch state { + case StateClosed: + cb.counts.onSuccess() + case StateHalfOpen: + cb.counts.onSuccess() + if cb.counts.ConsecutiveSuccesses >= cb.maxRequests { + cb.setState(StateClosed, now) + } + } +} + +func (cb *CircuitBreaker) onFailure(state State, now time.Time) { + switch state { + case StateClosed: + cb.counts.onFailure() + if cb.readyToTrip(cb.counts) { + cb.setState(StateOpen, now) + } + case StateHalfOpen: + cb.setState(StateOpen, now) + } +} + +func (cb *CircuitBreaker) currentState(now time.Time) (State, uint64) { + switch cb.state { + case StateClosed: + if !cb.expiry.IsZero() && cb.expiry.Before(now) { + cb.toNewGeneration(now) + } + case StateOpen: + if cb.expiry.Before(now) { + cb.setState(StateHalfOpen, now) + } + } + return cb.state, cb.generation +} + +func (cb *CircuitBreaker) setState(state State, now time.Time) { + if cb.state == state { + return + } + + prev := cb.state + cb.state = state + + cb.toNewGeneration(now) + + if cb.onStateChange != nil { + cb.onStateChange(cb.name, prev, state) + } +} + +func (cb *CircuitBreaker) toNewGeneration(now time.Time) { + cb.generation++ + cb.counts.clear() + + var zero time.Time + switch cb.state { + case StateClosed: + if cb.interval == 0 { + cb.expiry = zero + } else { + cb.expiry = now.Add(cb.interval) + } + case StateOpen: + cb.expiry = now.Add(cb.timeout) + default: // StateHalfOpen + cb.expiry = zero + } +} diff --git a/vendor/modules.txt b/vendor/modules.txt index 89e1471..3f32c41 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -35,6 +35,8 @@ github.com/philhofer/fwd github.com/pkg/errors # github.com/pmezard/go-difflib v1.0.0 github.com/pmezard/go-difflib/difflib +# github.com/sony/gobreaker v0.0.0-20181109014844-d928aaea92e1 +github.com/sony/gobreaker # github.com/stretchr/testify v1.2.2 github.com/stretchr/testify/require github.com/stretchr/testify/assert -- GitLab From 23c990db86dcf8f1e59a396af530c086b459a609 Mon Sep 17 00:00:00 2001 From: Henri Philipps Date: Mon, 11 Mar 2019 17:37:47 +0100 Subject: [PATCH 2/2] separate executor interface from implementations --- circuitbreaker/examples_test.go | 16 ++++++------ circuitbreaker/executor/executor.go | 25 +++---------------- .../{options.go => gobreaker/gobreaker.go} | 23 ++++++++++++++--- circuitbreaker/options.go | 3 ++- circuitbreaker/roundtripper_test.go | 8 +++--- 5 files changed, 37 insertions(+), 38 deletions(-) rename circuitbreaker/executor/{options.go => gobreaker/gobreaker.go} (78%) diff --git a/circuitbreaker/examples_test.go b/circuitbreaker/examples_test.go index a13637d..7e94757 100644 --- a/circuitbreaker/examples_test.go +++ b/circuitbreaker/examples_test.go @@ -8,7 +8,7 @@ import ( "time" "gitlab.com/gitlab-org/labkit/circuitbreaker" - "gitlab.com/gitlab-org/labkit/circuitbreaker/executor" + "gitlab.com/gitlab-org/labkit/circuitbreaker/executor/gobreaker" ) func Example() { @@ -22,13 +22,13 @@ func Example() { // Initialize executor showing all available options. // Using the default values - providing no options would have the same result. - ex := executor.New( - executor.WithName(""), - executor.WithMaxRequests(1), - executor.WithResetInterval(time.Duration(0)), - executor.WithTimeout(10*time.Second), - executor.WithTripFunc(nil), - executor.WithStateChangeFunc(nil), + ex := gobreaker.NewExecutor( + gobreaker.WithName(""), + gobreaker.WithMaxRequests(1), + gobreaker.WithResetInterval(time.Duration(0)), + gobreaker.WithTimeout(10*time.Second), + gobreaker.WithTripFunc(nil), + gobreaker.WithStateChangeFunc(nil), ) // A TripFunc that trips after 10 consecutive errors could look like this diff --git a/circuitbreaker/executor/executor.go b/circuitbreaker/executor/executor.go index 91c7fb7..227ed04 100644 --- a/circuitbreaker/executor/executor.go +++ b/circuitbreaker/executor/executor.go @@ -1,28 +1,9 @@ -// Package executor is using github.com/sony/gobreaker as underlying circuit breaker implementation -// but is using the Executor interface as a facade to provide a stable API to clients if we should decide -// to use a different circuit breaker package in the future. +// Package executor is providing the Executor interface as a facade for underlying concrete +// circuit breaker implementations to provide a stable API to clients. +// Currently we only support a github.com/sony/gobreaker implementation. package executor -import "github.com/sony/gobreaker" - // Executor is an interface that should be able to match most circuit breaker implementations. type Executor interface { Execute(func() (interface{}, error)) (interface{}, error) } - -// New is returning a new executor. -func New(opts ...Option) Executor { - - config := applyOptions(opts) - - settings := gobreaker.Settings{ - Name: config.name, - MaxRequests: config.halfOpenMaxRequests, - Interval: config.resetInterval, - Timeout: config.timeout, - ReadyToTrip: config.readyToTrip, - OnStateChange: config.onStateChange, - } - - return gobreaker.NewCircuitBreaker(settings) -} diff --git a/circuitbreaker/executor/options.go b/circuitbreaker/executor/gobreaker/gobreaker.go similarity index 78% rename from circuitbreaker/executor/options.go rename to circuitbreaker/executor/gobreaker/gobreaker.go index 79d9779..2f0e219 100644 --- a/circuitbreaker/executor/options.go +++ b/circuitbreaker/executor/gobreaker/gobreaker.go @@ -1,4 +1,4 @@ -package executor +package gobreaker import ( "time" @@ -6,7 +6,24 @@ import ( "github.com/sony/gobreaker" ) -// The configuration for the circuit breaker. +// NewExecutor is returning a new *CircutiBreaker which is implementing the Executor interface. +func NewExecutor(opts ...Option) *gobreaker.CircuitBreaker { + + config := applyOptions(opts) + + settings := gobreaker.Settings{ + Name: config.name, + MaxRequests: config.halfOpenMaxRequests, + Interval: config.resetInterval, + Timeout: config.timeout, + ReadyToTrip: config.readyToTrip, + OnStateChange: config.onStateChange, + } + + return gobreaker.NewCircuitBreaker(settings) +} + +// The configuration for gobreaker. type config struct { name string halfOpenMaxRequests uint32 @@ -16,7 +33,7 @@ type config struct { onStateChange func(name string, from gobreaker.State, to gobreaker.State) } -// Option will configure a circuit breaker Executor. +// Option will configure gobreaker. type Option func(*config) func applyOptions(opts []Option) config { diff --git a/circuitbreaker/options.go b/circuitbreaker/options.go index 49a9a6b..5856d66 100644 --- a/circuitbreaker/options.go +++ b/circuitbreaker/options.go @@ -2,6 +2,7 @@ package circuitbreaker import ( "gitlab.com/gitlab-org/labkit/circuitbreaker/executor" + "gitlab.com/gitlab-org/labkit/circuitbreaker/executor/gobreaker" ) // The configuration for the circuit breaker. @@ -12,7 +13,7 @@ type config struct { func newConfig() config { return config{ - executor: executor.New(), + executor: gobreaker.NewExecutor(), failer: defaultFailerFunc, } } diff --git a/circuitbreaker/roundtripper_test.go b/circuitbreaker/roundtripper_test.go index 6428fef..cf27340 100644 --- a/circuitbreaker/roundtripper_test.go +++ b/circuitbreaker/roundtripper_test.go @@ -6,7 +6,7 @@ import ( "testing" "time" - "gitlab.com/gitlab-org/labkit/circuitbreaker/executor" + "gitlab.com/gitlab-org/labkit/circuitbreaker/executor/gobreaker" ) type delegatedRoundTripper struct { @@ -27,7 +27,7 @@ func TestSucceedingRoundTripper(t *testing.T) { }) client := &http.Client{ - Transport: NewRoundTripper(mockTransport, WithExecutor(executor.New())), + Transport: NewRoundTripper(mockTransport, WithExecutor(gobreaker.NewExecutor())), } req, err := http.NewRequest("GET", "http://example.com", nil) @@ -53,7 +53,7 @@ func TestStatusCode400RoundTripper(t *testing.T) { }) client := &http.Client{ - Transport: NewRoundTripper(mockTransport, WithExecutor(executor.New(executor.WithTimeout(timeout)))), + Transport: NewRoundTripper(mockTransport, WithExecutor(gobreaker.NewExecutor(gobreaker.WithTimeout(timeout)))), } req, err := http.NewRequest("GET", "http://example.com", nil) @@ -106,7 +106,7 @@ func TestFailingRoundTripper(t *testing.T) { }) client := &http.Client{ - Transport: NewRoundTripper(mockTransport, WithExecutor(executor.New(executor.WithTimeout(timeout)))), + Transport: NewRoundTripper(mockTransport, WithExecutor(gobreaker.NewExecutor(gobreaker.WithTimeout(timeout)))), } req, err := http.NewRequest("GET", "http://example.com", nil) -- GitLab