diff --git a/app/models/bulk_imports/tracker.rb b/app/models/bulk_imports/tracker.rb index c185470b1c27951f070184c76a7cd26d1012ae19..9de3239ee0f86463654e41dcc9279fb307b94b5c 100644 --- a/app/models/bulk_imports/tracker.rb +++ b/app/models/bulk_imports/tracker.rb @@ -50,6 +50,8 @@ def pipeline_class event :start do transition created: :started + # To avoid errors when re-starting a pipeline in case of network errors + transition started: :started end event :finish do diff --git a/app/workers/bulk_imports/pipeline_worker.rb b/app/workers/bulk_imports/pipeline_worker.rb index 760a309a381b520aaa78df510239530ebd034543..35633b5548925758705291315b6070d9cac53c96 100644 --- a/app/workers/bulk_imports/pipeline_worker.rb +++ b/app/workers/bulk_imports/pipeline_worker.rb @@ -16,7 +16,7 @@ class PipelineWorker # rubocop:disable Scalability/IdempotentWorker def perform(pipeline_tracker_id, stage, entity_id) pipeline_tracker = ::BulkImports::Tracker - .with_status(:created) + .with_status(:created, :started) .find_by_id(pipeline_tracker_id) if pipeline_tracker.present? @@ -59,18 +59,35 @@ def run(pipeline_tracker) pipeline_tracker.pipeline_class.new(context).run pipeline_tracker.finish! + rescue BulkImports::NetworkError => e + if e.retriable?(pipeline_tracker) + logger.error( + worker: self.class.name, + entity_id: pipeline_tracker.entity.id, + pipeline_name: pipeline_tracker.pipeline_name, + message: "Retrying error: #{e.message}" + ) + + reenqueue(pipeline_tracker, delay: e.retry_delay) + else + fail_tracker(pipeline_tracker, e) + end rescue StandardError => e + fail_tracker(pipeline_tracker, e) + end + + def fail_tracker(pipeline_tracker, exception) pipeline_tracker.update!(status_event: 'fail_op', jid: jid) logger.error( worker: self.class.name, entity_id: pipeline_tracker.entity.id, pipeline_name: pipeline_tracker.pipeline_name, - message: e.message + message: exception.message ) Gitlab::ErrorTracking.track_exception( - e, + exception, entity_id: pipeline_tracker.entity.id, pipeline_name: pipeline_tracker.pipeline_name ) @@ -88,8 +105,13 @@ def job_timeout?(pipeline_tracker) (Time.zone.now - pipeline_tracker.entity.created_at) > Pipeline::NDJSON_EXPORT_TIMEOUT end - def reenqueue(pipeline_tracker) - self.class.perform_in(NDJSON_PIPELINE_PERFORM_DELAY, pipeline_tracker.id, pipeline_tracker.stage, pipeline_tracker.entity.id) + def reenqueue(pipeline_tracker, delay: NDJSON_PIPELINE_PERFORM_DELAY) + self.class.perform_in( + delay, + pipeline_tracker.id, + pipeline_tracker.stage, + pipeline_tracker.entity.id + ) end end end diff --git a/lib/bulk_imports/clients/graphql.rb b/lib/bulk_imports/clients/graphql.rb index 0adc2b1c57fe1095955bf08204a69c28363eae84..43ad9f0aa2dd6673cd747e1ad2e12fd8a22602c3 100644 --- a/lib/bulk_imports/clients/graphql.rb +++ b/lib/bulk_imports/clients/graphql.rb @@ -17,6 +17,8 @@ def execute(document:, operation_name: nil, variables: {}, context: {}) ) ::Gitlab::Json.parse(response.body) + rescue *Gitlab::HTTP::HTTP_ERRORS => e + raise ::BulkImports::NetworkError, e end end private_constant :HTTP diff --git a/lib/bulk_imports/clients/http.rb b/lib/bulk_imports/clients/http.rb index 6c363a3552f30c0bfb7f2fecec8b412ef3d5d252..f98d4a5eb149c78a72d03009f406099fd35801d9 100644 --- a/lib/bulk_imports/clients/http.rb +++ b/lib/bulk_imports/clients/http.rb @@ -113,11 +113,11 @@ def request_headers def with_error_handling response = yield - raise(::BulkImports::Error, "Error #{response.code}") unless response.success? + raise ::BulkImports::NetworkError.new(response: response) unless response.success? response rescue *Gitlab::HTTP::HTTP_ERRORS => e - raise(::BulkImports::Error, e) + raise ::BulkImports::NetworkError, e end def api_url diff --git a/lib/bulk_imports/network_error.rb b/lib/bulk_imports/network_error.rb new file mode 100644 index 0000000000000000000000000000000000000000..d69b0172f6c43ca0593066a4d3b40d15afd8e481 --- /dev/null +++ b/lib/bulk_imports/network_error.rb @@ -0,0 +1,61 @@ +# frozen_string_literal: true + +module BulkImports + class NetworkError < Error + COUNTER_KEY = 'bulk_imports/%{entity_id}/%{stage}/%{tracker_id}/network_error/%{error}' + + RETRIABLE_EXCEPTIONS = Gitlab::HTTP::HTTP_TIMEOUT_ERRORS + RETRIABLE_HTTP_CODES = [429].freeze + + DEFAULT_RETRY_DELAY_SECONDS = 60 + + MAX_RETRIABLE_COUNT = 3 + + def initialize(message = nil, response: nil) + raise ArgumentError, 'message or response required' if message.blank? && response.blank? + + super(message) + + @response = response + end + + def retriable?(tracker) + if retriable_exception? || retriable_http_code? + increment(tracker) <= MAX_RETRIABLE_COUNT + else + false + end + end + + def retry_delay + if response&.code == 429 + response.headers.fetch('Retry-After', DEFAULT_RETRY_DELAY_SECONDS).to_i + else + DEFAULT_RETRY_DELAY_SECONDS + end.seconds + end + + private + + attr_reader :response + + def retriable_exception? + RETRIABLE_EXCEPTIONS.include?(cause&.class) + end + + def retriable_http_code? + RETRIABLE_HTTP_CODES.include?(response&.code) + end + + def increment(tracker) + key = COUNTER_KEY % { + stage: tracker.stage, + tracker_id: tracker.id, + entity_id: tracker.entity.id, + error: cause.class.name + } + + Gitlab::Cache::Import::Caching.increment(key) + end + end +end diff --git a/lib/gitlab/cache/import/caching.rb b/lib/gitlab/cache/import/caching.rb index 947efee43a9764fc4895f4f5ee9d017f3bc351e2..4dbce0b05e172d0b93d39e915f8f4fc3ba213495 100644 --- a/lib/gitlab/cache/import/caching.rb +++ b/lib/gitlab/cache/import/caching.rb @@ -84,8 +84,10 @@ def self.increment(raw_key, timeout: TIMEOUT) key = cache_key_for(raw_key) Redis::Cache.with do |redis| - redis.incr(key) + value = redis.incr(key) redis.expire(key, timeout) + + value end end diff --git a/spec/lib/bulk_imports/clients/http_spec.rb b/spec/lib/bulk_imports/clients/http_spec.rb index c36cb80851a1b076c69c592741b4db3c32a1d3c5..023562626a1a1a46dc450a51e79804bab7bef0dd 100644 --- a/spec/lib/bulk_imports/clients/http_spec.rb +++ b/spec/lib/bulk_imports/clients/http_spec.rb @@ -32,7 +32,7 @@ it 'raises BulkImports::Error' do allow(Gitlab::HTTP).to receive(method).and_raise(Errno::ECONNREFUSED) - expect { subject.public_send(method, resource) }.to raise_exception(BulkImports::Error) + expect { subject.public_send(method, resource) }.to raise_exception(BulkImports::NetworkError) end end @@ -42,7 +42,7 @@ allow(Gitlab::HTTP).to receive(method).and_return(response_double) - expect { subject.public_send(method, resource) }.to raise_exception(BulkImports::Error) + expect { subject.public_send(method, resource) }.to raise_exception(BulkImports::NetworkError) end end end @@ -180,7 +180,11 @@ def stub_http_get(path, query, response) let(:version) { '13.0.0' } it 'raises an error' do - expect { subject.get(resource) }.to raise_error(::BulkImports::Error, "Unsupported GitLab Version. Minimum Supported Gitlab Version #{BulkImport::MINIMUM_GITLAB_MAJOR_VERSION}.") + expect { subject.get(resource) } + .to raise_error( + ::BulkImports::Error, + "Unsupported GitLab Version. Minimum Supported Gitlab Version #{BulkImport::MINIMUM_GITLAB_MAJOR_VERSION}." + ) end end diff --git a/spec/lib/bulk_imports/network_error_spec.rb b/spec/lib/bulk_imports/network_error_spec.rb new file mode 100644 index 0000000000000000000000000000000000000000..11f555fee09ed5dca73438fd41c6057b4fb8dfeb --- /dev/null +++ b/spec/lib/bulk_imports/network_error_spec.rb @@ -0,0 +1,72 @@ +# frozen_string_literal: true + +require 'spec_helper' + +RSpec.describe BulkImports::NetworkError, :clean_gitlab_redis_cache do + let(:tracker) { double(id: 1, stage: 2, entity: double(id: 3)) } + + describe '.new' do + it 'requires either a message or a HTTP response' do + expect { described_class.new } + .to raise_error(ArgumentError, 'message or response required') + end + end + + describe '#retriable?' do + it 'returns true for MAX_RETRIABLE_COUNT times when cause if one of RETRIABLE_EXCEPTIONS' do + raise described_class::RETRIABLE_EXCEPTIONS.sample + rescue StandardError => cause + begin + raise described_class, cause + rescue StandardError => exception + described_class::MAX_RETRIABLE_COUNT.times do + expect(exception.retriable?(tracker)).to eq(true) + end + + expect(exception.retriable?(tracker)).to eq(false) + end + end + + it 'returns true for MAX_RETRIABLE_COUNT times when response is one of RETRIABLE_CODES' do + exception = described_class.new(response: double(code: 429)) + + described_class::MAX_RETRIABLE_COUNT.times do + expect(exception.retriable?(tracker)).to eq(true) + end + + expect(exception.retriable?(tracker)).to eq(false) + end + + it 'returns false for other exceptions' do + raise StandardError + rescue StandardError => cause + begin + raise described_class, cause + rescue StandardError => exception + expect(exception.retriable?(tracker)).to eq(false) + end + end + end + + describe '#retry_delay' do + it 'returns the default value when there is not a rate limit error' do + exception = described_class.new('foo') + + expect(exception.retry_delay).to eq(described_class::DEFAULT_RETRY_DELAY_SECONDS.seconds) + end + + context 'when the exception is a rate limit error' do + it 'returns the "Retry-After"' do + exception = described_class.new(response: double(code: 429, headers: { 'Retry-After' => 20 })) + + expect(exception.retry_delay).to eq(20.seconds) + end + + it 'returns the default value when there is no "Retry-After" header' do + exception = described_class.new(response: double(code: 429, headers: {})) + + expect(exception.retry_delay).to eq(described_class::DEFAULT_RETRY_DELAY_SECONDS.seconds) + end + end + end +end diff --git a/spec/lib/gitlab/cache/import/caching_spec.rb b/spec/lib/gitlab/cache/import/caching_spec.rb index f770960e27a0a4b489cdd08d0609e200400b01ef..946a7c604a1d53244bf5226fa08886e27ea0a5a6 100644 --- a/spec/lib/gitlab/cache/import/caching_spec.rb +++ b/spec/lib/gitlab/cache/import/caching_spec.rb @@ -58,6 +58,16 @@ end end + describe '.increment' do + it 'increment a key and returns the current value' do + expect(described_class.increment('foo')).to eq(1) + + value = Gitlab::Redis::Cache.with { |r| r.get(described_class.cache_key_for('foo')) } + + expect(value.to_i).to eq(1) + end + end + describe '.set_add' do it 'adds a value to a set' do described_class.set_add('foo', 10) diff --git a/spec/workers/bulk_imports/pipeline_worker_spec.rb b/spec/workers/bulk_imports/pipeline_worker_spec.rb index 56f28654ac55fe00e537e4687d986b8ccb6bb260..fc70a2582e484008d653190f741775bcf71b738e 100644 --- a/spec/workers/bulk_imports/pipeline_worker_spec.rb +++ b/spec/workers/bulk_imports/pipeline_worker_spec.rb @@ -27,42 +27,59 @@ def self.ndjson_pipeline? .and_return([[0, pipeline_class]]) end - it 'runs the given pipeline successfully' do - pipeline_tracker = create( - :bulk_import_tracker, - entity: entity, - pipeline_name: 'FakePipeline' - ) - - expect_next_instance_of(Gitlab::Import::Logger) do |logger| - expect(logger) - .to receive(:info) - .with( - worker: described_class.name, - pipeline_name: 'FakePipeline', - entity_id: entity.id - ) - end + shared_examples 'successfully runs the pipeline' do + it 'runs the given pipeline successfully' do + expect_next_instance_of(Gitlab::Import::Logger) do |logger| + expect(logger) + .to receive(:info) + .with( + worker: described_class.name, + pipeline_name: 'FakePipeline', + entity_id: entity.id + ) + end - expect(BulkImports::EntityWorker) - .to receive(:perform_async) - .with(entity.id, pipeline_tracker.stage) + expect(BulkImports::EntityWorker) + .to receive(:perform_async) + .with(entity.id, pipeline_tracker.stage) + + expect(subject).to receive(:jid).and_return('jid') - expect(subject).to receive(:jid).and_return('jid') + subject.perform(pipeline_tracker.id, pipeline_tracker.stage, entity.id) - subject.perform(pipeline_tracker.id, pipeline_tracker.stage, entity.id) + pipeline_tracker.reload - pipeline_tracker.reload + expect(pipeline_tracker.status_name).to eq(:finished) + expect(pipeline_tracker.jid).to eq('jid') + end + end - expect(pipeline_tracker.status_name).to eq(:finished) - expect(pipeline_tracker.jid).to eq('jid') + it_behaves_like 'successfully runs the pipeline' do + let(:pipeline_tracker) do + create( + :bulk_import_tracker, + entity: entity, + pipeline_name: 'FakePipeline' + ) + end + end + + it_behaves_like 'successfully runs the pipeline' do + let(:pipeline_tracker) do + create( + :bulk_import_tracker, + :started, + entity: entity, + pipeline_name: 'FakePipeline' + ) + end end context 'when the pipeline cannot be found' do it 'logs the error' do pipeline_tracker = create( :bulk_import_tracker, - :started, + :finished, entity: entity, pipeline_name: 'FakePipeline' ) @@ -126,6 +143,39 @@ def self.ndjson_pipeline? expect(pipeline_tracker.status_name).to eq(:failed) expect(pipeline_tracker.jid).to eq('jid') end + + context 'when it is a network error' do + it 'reenqueue on retriable network errors' do + pipeline_tracker = create( + :bulk_import_tracker, + entity: entity, + pipeline_name: 'FakePipeline' + ) + + exception = BulkImports::NetworkError.new( + response: double(code: 429, headers: {}) + ) + + expect_next_instance_of(pipeline_class) do |pipeline| + expect(pipeline) + .to receive(:run) + .and_raise(exception) + end + + expect(subject).to receive(:jid).and_return('jid') + + expect(described_class) + .to receive(:perform_in) + .with( + 60.seconds, + pipeline_tracker.id, + pipeline_tracker.stage, + pipeline_tracker.entity.id + ) + + subject.perform(pipeline_tracker.id, pipeline_tracker.stage, entity.id) + end + end end context 'when ndjson pipeline' do