From 7a4c3dd0b69df6b2a5434adc19bcc0af1bfbce82 Mon Sep 17 00:00:00 2001 From: George Koltsov Date: Fri, 1 Apr 2022 21:48:59 +0100 Subject: [PATCH] Do not throttle enqueueing of BulkImports::EntityWorker --- app/workers/bulk_import_worker.rb | 22 +++------------------- spec/workers/bulk_import_worker_spec.rb | 23 ++++------------------- 2 files changed, 7 insertions(+), 38 deletions(-) diff --git a/app/workers/bulk_import_worker.rb b/app/workers/bulk_import_worker.rb index d560ebcc6e620f..157586ca397be4 100644 --- a/app/workers/bulk_import_worker.rb +++ b/app/workers/bulk_import_worker.rb @@ -3,15 +3,12 @@ class BulkImportWorker # rubocop:disable Scalability/IdempotentWorker include ApplicationWorker - data_consistency :always + PERFORM_DELAY = 5.seconds + data_consistency :always feature_category :importers - sidekiq_options retry: false, dead: false - PERFORM_DELAY = 5.seconds - DEFAULT_BATCH_SIZE = 5 - def perform(bulk_import_id) @bulk_import = BulkImport.find_by_id(bulk_import_id) @@ -19,11 +16,10 @@ def perform(bulk_import_id) return if @bulk_import.finished? || @bulk_import.failed? return @bulk_import.fail_op! if all_entities_failed? return @bulk_import.finish! if all_entities_processed? && @bulk_import.started? - return re_enqueue if max_batch_size_exceeded? # Do not start more jobs if max allowed are already running @bulk_import.start! if @bulk_import.created? - created_entities.first(next_batch_size).each do |entity| + created_entities.find_each do |entity| entity.create_pipeline_trackers! BulkImports::ExportRequestWorker.perform_async(entity.id) @@ -45,10 +41,6 @@ def entities @entities ||= @bulk_import.entities end - def started_entities - entities.with_status(:started) - end - def created_entities entities.with_status(:created) end @@ -61,14 +53,6 @@ def all_entities_failed? entities.all? { |entity| entity.failed? } end - def max_batch_size_exceeded? - started_entities.count >= DEFAULT_BATCH_SIZE - end - - def next_batch_size - [DEFAULT_BATCH_SIZE - started_entities.count, 0].max - end - # A new BulkImportWorker job is enqueued to either # - Process the new BulkImports::Entity created during import (e.g. for the subgroups) # - Or to mark the `bulk_import` as finished diff --git a/spec/workers/bulk_import_worker_spec.rb b/spec/workers/bulk_import_worker_spec.rb index 12e29573156a28..8d9ad03953ec04 100644 --- a/spec/workers/bulk_import_worker_spec.rb +++ b/spec/workers/bulk_import_worker_spec.rb @@ -56,17 +56,6 @@ end end - context 'when maximum allowed number of import entities in progress' do - it 'reenqueues itself' do - bulk_import = create(:bulk_import, :started) - (described_class::DEFAULT_BATCH_SIZE + 1).times { |_| create(:bulk_import_entity, :started, bulk_import: bulk_import) } - - expect(described_class).to receive(:perform_in).with(described_class::PERFORM_DELAY, bulk_import.id) - - subject.perform(bulk_import.id) - end - end - context 'when bulk import is created' do it 'marks bulk import as started' do bulk_import = create(:bulk_import, :created) @@ -93,21 +82,17 @@ context 'when there are created entities to process' do let_it_be(:bulk_import) { create(:bulk_import, :created) } - before do - stub_const("#{described_class}::DEFAULT_BATCH_SIZE", 1) - end - - it 'marks a batch of entities as started, enqueues EntityWorker, ExportRequestWorker and reenqueues' do + it 'marks all entities as started, enqueues EntityWorker, ExportRequestWorker and reenqueues' do create(:bulk_import_entity, :created, bulk_import: bulk_import) create(:bulk_import_entity, :created, bulk_import: bulk_import) expect(described_class).to receive(:perform_in).with(described_class::PERFORM_DELAY, bulk_import.id) - expect(BulkImports::EntityWorker).to receive(:perform_async) - expect(BulkImports::ExportRequestWorker).to receive(:perform_async) + expect(BulkImports::EntityWorker).to receive(:perform_async).twice + expect(BulkImports::ExportRequestWorker).to receive(:perform_async).twice subject.perform(bulk_import.id) - expect(bulk_import.entities.map(&:status_name)).to contain_exactly(:created, :started) + expect(bulk_import.entities.map(&:status_name)).to contain_exactly(:started, :started) end context 'when there are project entities to process' do -- GitLab