From 8c79744113949678f2ec8bb698cf848a5647fbc4 Mon Sep 17 00:00:00 2001 From: Rodrigo Tomonari Date: Mon, 6 Nov 2023 21:44:40 -0300 Subject: [PATCH 1/7] Defer Direct Transfer workers in case DB is not health Update PipelineWorker and PipelineBatchWorker workers to use the `defer_on_database_health_signal` option. Also, update defer_on_database_health_signal to receive a block to lazy evaluate the schema and table as the workers modify different tables depending on the relation being imported --- .../bulk_imports/pipeline_batch_worker.rb | 15 +++++ app/workers/bulk_imports/pipeline_worker.rb | 14 +++++ app/workers/concerns/worker_attributes.rb | 4 +- lib/bulk_imports/pipeline_schema_info.rb | 36 +++++++++++ lib/gitlab/sidekiq_middleware/skip_jobs.rb | 12 +++- .../bulk_imports/pipeline_schema_info_spec.rb | 59 +++++++++++++++++++ .../sidekiq_middleware/skip_jobs_spec.rb | 15 +++++ .../pipeline_batch_worker_spec.rb | 28 +++++++++ .../bulk_imports/pipeline_worker_spec.rb | 33 +++++++++++ 9 files changed, 211 insertions(+), 5 deletions(-) create mode 100644 lib/bulk_imports/pipeline_schema_info.rb create mode 100644 spec/lib/bulk_imports/pipeline_schema_info_spec.rb diff --git a/app/workers/bulk_imports/pipeline_batch_worker.rb b/app/workers/bulk_imports/pipeline_batch_worker.rb index 55936e85d48321..28f4b15557f65b 100644 --- a/app/workers/bulk_imports/pipeline_batch_worker.rb +++ b/app/workers/bulk_imports/pipeline_batch_worker.rb @@ -5,6 +5,8 @@ class PipelineBatchWorker include ApplicationWorker include ExclusiveLeaseGuard + DEFER_ON_HEALTH_DELAY = 5.minutes + data_consistency :always # rubocop:disable SidekiqLoadBalancing/WorkerDataConsistency feature_category :importers sidekiq_options dead: false, retry: 3 @@ -16,6 +18,19 @@ class PipelineBatchWorker new.perform_failure(msg['args'].first, exception) end + defer_on_database_health_signal(:gitlab_main, [], DEFER_ON_HEALTH_DELAY) do |job_args, schema, tables| + batch = ::BulkImports::BatchTracker.find(job_args.first) + pipeline_tracker = batch.tracker + pipeline_schema = ::BulkImports::PipelineSchemaInfo.new(pipeline_tracker.pipeline_class, pipeline_tracker.entity) + + if pipeline_schema.db_schema && pipeline_schema.db_table + schema = pipeline_schema.db_schema + tables = [pipeline_schema.db_table] + end + + [schema, tables] + end + def perform(batch_id) @batch = ::BulkImports::BatchTracker.find(batch_id) diff --git a/app/workers/bulk_imports/pipeline_worker.rb b/app/workers/bulk_imports/pipeline_worker.rb index 4b1df9c85a69ae..b3cd1949c7768c 100644 --- a/app/workers/bulk_imports/pipeline_worker.rb +++ b/app/workers/bulk_imports/pipeline_worker.rb @@ -7,6 +7,8 @@ class PipelineWorker FILE_EXTRACTION_PIPELINE_PERFORM_DELAY = 10.seconds + DEFER_ON_HEALTH_DELAY = 5.minutes + data_consistency :always feature_category :importers sidekiq_options dead: false, retry: 3 @@ -21,6 +23,18 @@ class PipelineWorker new.perform_failure(msg['args'][0], msg['args'][2], exception) end + defer_on_database_health_signal(:gitlab_main, [], DEFER_ON_HEALTH_DELAY) do |job_args, schema, tables| + pipeline_tracker = ::BulkImports::Tracker.find(job_args.first) + pipeline_schema = ::BulkImports::PipelineSchemaInfo.new(pipeline_tracker.pipeline_class, pipeline_tracker.entity) + + if pipeline_schema.db_schema && pipeline_schema.db_table + schema = pipeline_schema.db_schema + tables = [pipeline_schema.db_table] + end + + [schema, tables] + end + # Keep _stage parameter for backwards compatibility. def perform(pipeline_tracker_id, _stage, entity_id) @entity = ::BulkImports::Entity.find(entity_id) diff --git a/app/workers/concerns/worker_attributes.rb b/app/workers/concerns/worker_attributes.rb index cb09aaf1a6a6cc..28c82a5a38ea6f 100644 --- a/app/workers/concerns/worker_attributes.rb +++ b/app/workers/concerns/worker_attributes.rb @@ -201,10 +201,10 @@ def big_payload? !!get_class_attribute(:big_payload) end - def defer_on_database_health_signal(gitlab_schema, tables = [], delay_by = DEFAULT_DEFER_DELAY) + def defer_on_database_health_signal(gitlab_schema, tables = [], delay_by = DEFAULT_DEFER_DELAY, &block) set_class_attribute( :database_health_check_attrs, - { gitlab_schema: gitlab_schema, tables: tables, delay_by: delay_by } + { gitlab_schema: gitlab_schema, tables: tables, delay_by: delay_by, block: block } ) end diff --git a/lib/bulk_imports/pipeline_schema_info.rb b/lib/bulk_imports/pipeline_schema_info.rb new file mode 100644 index 00000000000000..c44e9da59a60dd --- /dev/null +++ b/lib/bulk_imports/pipeline_schema_info.rb @@ -0,0 +1,36 @@ +# frozen_string_literal: true + +module BulkImports + class PipelineSchemaInfo + def initialize(pipeline_class, entity) + @pipeline = pipeline_class.new(nil) + @portable_class = entity.project? ? Project : Group + end + + def db_schema + return unless relation + return unless association + + Gitlab::Database::GitlabSchema.tables_to_schema[association.table_name] + end + + def db_table + return unless relation + return unless association + + association.table_name + end + + private + + attr_reader :pipeline, :portable_class + + def relation + pipeline.try(:relation) + end + + def association + @association ||= portable_class.reflect_on_association(relation) + end + end +end diff --git a/lib/gitlab/sidekiq_middleware/skip_jobs.rb b/lib/gitlab/sidekiq_middleware/skip_jobs.rb index 34ad843e8ee597..ebb3f6782ba498 100644 --- a/lib/gitlab/sidekiq_middleware/skip_jobs.rb +++ b/lib/gitlab/sidekiq_middleware/skip_jobs.rb @@ -80,13 +80,19 @@ def defer_job_by_database_health_signal?(job, worker_class) end health_check_attrs = worker_class.database_health_check_attrs - job_base_model = Gitlab::Database.schemas_to_base_models[health_check_attrs[:gitlab_schema]].first + + tables = health_check_attrs[:tables] + schema = health_check_attrs[:gitlab_schema] + + schema, tables = health_check_attrs[:block].call(job['args'], schema, tables) if health_check_attrs[:block] + + job_base_model = Gitlab::Database.schemas_to_base_models[schema].first health_context = Gitlab::Database::HealthStatus::Context.new( DatabaseHealthStatusChecker.new(job['jid'], worker_class.name), job_base_model.connection, - health_check_attrs[:tables], - health_check_attrs[:gitlab_schema] + tables, + schema ) Gitlab::Database::HealthStatus.evaluate(health_context).any?(&:stop?) diff --git a/spec/lib/bulk_imports/pipeline_schema_info_spec.rb b/spec/lib/bulk_imports/pipeline_schema_info_spec.rb new file mode 100644 index 00000000000000..18761224fdff31 --- /dev/null +++ b/spec/lib/bulk_imports/pipeline_schema_info_spec.rb @@ -0,0 +1,59 @@ +# frozen_string_literal: true + +require 'spec_helper' + +RSpec.describe BulkImports::PipelineSchemaInfo, feature_category: :importers do + let_it_be(:entity) { build(:bulk_import_entity, :project_entity) } + + let(:pipeline_class) { BulkImports::Common::Pipelines::LabelsPipeline } + + subject { described_class.new(pipeline_class, entity) } + + describe '#db_schema' do + context 'when pipeline defines a relation name which is an association' do + it 'returns the schema name of the table used by the association' do + expect(subject.db_schema).to eq(:gitlab_main) + end + end + + context 'when pipeline does not define a relation name' do + let(:pipeline_class) { BulkImports::Common::Pipelines::BadgesPipeline } + + it 'returns nil' do + expect(subject.db_schema).to eq(nil) + end + end + + context 'when pipeline relation name is not an association' do + let(:pipeline_class) { BulkImports::Projects::Pipelines::CommitNotesPipeline } + + it 'returns nil' do + expect(subject.db_schema).to eq(nil) + end + end + end + + describe '#db_table' do + context 'when pipeline defines a relation name which is an association' do + it 'returns the name of the table used by the association' do + expect(subject.db_table).to eq('labels') + end + end + + context 'when pipeline does not define a relation name' do + let(:pipeline_class) { BulkImports::Common::Pipelines::BadgesPipeline } + + it 'returns nil' do + expect(subject.db_table).to eq(nil) + end + end + + context 'when pipeline relation name is not an association' do + let(:pipeline_class) { BulkImports::Projects::Pipelines::CommitNotesPipeline } + + it 'returns nil' do + expect(subject.db_table).to eq(nil) + end + end + end +end diff --git a/spec/lib/gitlab/sidekiq_middleware/skip_jobs_spec.rb b/spec/lib/gitlab/sidekiq_middleware/skip_jobs_spec.rb index 2fa0e44d44f15f..e007439618291a 100644 --- a/spec/lib/gitlab/sidekiq_middleware/skip_jobs_spec.rb +++ b/spec/lib/gitlab/sidekiq_middleware/skip_jobs_spec.rb @@ -185,6 +185,21 @@ def self.name TestWorker.perform_async(*job['args']) end end + + context 'with a block' do + before do + TestWorker.defer_on_database_health_signal(*health_signal_attrs.values) do + [:gitlab_ci, [:ci_pipelines]] + end + end + + it 'uses the lazy evaluated schema and tables' do + expect(Gitlab::Database::HealthStatus::Context).to receive(:new) + .with(anything, anything, [:ci_pipelines], :gitlab_ci).and_call_original + + expect { |b| subject.call(TestWorker.new, job, queue, &b) }.to yield_control + end + end end end end diff --git a/spec/workers/bulk_imports/pipeline_batch_worker_spec.rb b/spec/workers/bulk_imports/pipeline_batch_worker_spec.rb index 9ac297ae75782a..57ccffd2a14cbc 100644 --- a/spec/workers/bulk_imports/pipeline_batch_worker_spec.rb +++ b/spec/workers/bulk_imports/pipeline_batch_worker_spec.rb @@ -202,4 +202,32 @@ def self.file_extraction_pipeline? expect(batch.reload).to be_failed end end + + context 'with stop signal from database health check' do + around do |example| + with_sidekiq_server_middleware do |chain| + chain.add Gitlab::SidekiqMiddleware::SkipJobs + Sidekiq::Testing.inline! { example.run } + end + end + + context 'with stop signal from database health check' do + before do + stub_feature_flags("drop_sidekiq_jobs_#{described_class.name}": false) + + stop_signal = instance_double("Gitlab::Database::HealthStatus::Signals::Stop", stop?: true) + allow(Gitlab::Database::HealthStatus).to receive(:evaluate).and_return([stop_signal]) + end + + it 'defers the job by set time' do + expect_next_instance_of(described_class) do |worker| + expect(worker).not_to receive(:perform).with(batch.id) + end + + expect(described_class).to receive(:perform_in).with(described_class::DEFER_ON_HEALTH_DELAY, batch.id) + + described_class.perform_async(batch.id) + end + end + end end diff --git a/spec/workers/bulk_imports/pipeline_worker_spec.rb b/spec/workers/bulk_imports/pipeline_worker_spec.rb index 6ea7334f6a67b0..9ec2bbd1e3215f 100644 --- a/spec/workers/bulk_imports/pipeline_worker_spec.rb +++ b/spec/workers/bulk_imports/pipeline_worker_spec.rb @@ -155,6 +155,39 @@ def self.file_extraction_pipeline? end end + context 'with stop signal from database health check' do + around do |example| + with_sidekiq_server_middleware do |chain| + chain.add Gitlab::SidekiqMiddleware::SkipJobs + Sidekiq::Testing.inline! { example.run } + end + end + + context 'with stop signal from database health check' do + before do + stub_feature_flags("drop_sidekiq_jobs_#{described_class.name}": false) + + stop_signal = instance_double("Gitlab::Database::HealthStatus::Signals::Stop", stop?: true) + allow(Gitlab::Database::HealthStatus).to receive(:evaluate).and_return([stop_signal]) + end + + it 'defers the job by set time' do + expect_next_instance_of(described_class) do |worker| + expect(worker).not_to receive(:perform).with(pipeline_tracker.id, pipeline_tracker.stage, entity.id) + end + + expect(described_class).to receive(:perform_in).with( + described_class::DEFER_ON_HEALTH_DELAY, + pipeline_tracker.id, + pipeline_tracker.stage, + entity.id + ) + + described_class.perform_async(pipeline_tracker.id, pipeline_tracker.stage, entity.id) + end + end + end + context 'when pipeline is finished' do let(:pipeline_tracker) do create( -- GitLab From 98fa82de4ec0bd530f07ed61eb9719e29f18f861 Mon Sep 17 00:00:00 2001 From: Rodrigo Tomonari Date: Tue, 7 Nov 2023 10:15:46 -0300 Subject: [PATCH 2/7] Fix worker spec --- spec/workers/concerns/worker_attributes_spec.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spec/workers/concerns/worker_attributes_spec.rb b/spec/workers/concerns/worker_attributes_spec.rb index 90c07a9c9590fc..767a55162fbf3a 100644 --- a/spec/workers/concerns/worker_attributes_spec.rb +++ b/spec/workers/concerns/worker_attributes_spec.rb @@ -37,7 +37,7 @@ def self.name :worker_has_external_dependencies? | :worker_has_external_dependencies! | false | [] | true :idempotent? | :idempotent! | false | [] | true :big_payload? | :big_payload! | false | [] | true - :database_health_check_attrs | :defer_on_database_health_signal | nil | [:gitlab_main, [:users], 1.minute] | { gitlab_schema: :gitlab_main, tables: [:users], delay_by: 1.minute } + :database_health_check_attrs | :defer_on_database_health_signal | nil | [:gitlab_main, [:users], 1.minute] | { gitlab_schema: :gitlab_main, tables: [:users], delay_by: 1.minute, block: nil } end # rubocop: enable Layout/LineLength -- GitLab From c98553e0e71209658a936caaa1d22519abf6e05d Mon Sep 17 00:00:00 2001 From: Rodrigo Tomonari Date: Tue, 7 Nov 2023 20:19:26 -0300 Subject: [PATCH 3/7] Add feature flag and refactor --- app/models/bulk_imports/entity.rb | 4 ++ .../bulk_imports/pipeline_batch_worker.rb | 9 +++- app/workers/bulk_imports/pipeline_worker.rb | 9 +++- .../bulk_import_deferred_workers.yml | 8 +++ lib/bulk_imports/pipeline_schema_info.rb | 10 ++-- .../bulk_imports/pipeline_schema_info_spec.rb | 15 +++--- spec/models/bulk_imports/entity_spec.rb | 18 +++++++ .../pipeline_batch_worker_spec.rb | 43 ++++++++++++--- .../bulk_imports/pipeline_worker_spec.rb | 53 ++++++++++++++----- 9 files changed, 134 insertions(+), 35 deletions(-) create mode 100644 config/feature_flags/development/bulk_import_deferred_workers.yml diff --git a/app/models/bulk_imports/entity.rb b/app/models/bulk_imports/entity.rb index 437118c36e8442..a075c2f7e4ff72 100644 --- a/app/models/bulk_imports/entity.rb +++ b/app/models/bulk_imports/entity.rb @@ -124,6 +124,10 @@ def pluralized_name entity_type.pluralize end + def portable_class + entity_type.classify.constantize + end + def base_resource_url_path "/#{pluralized_name}/#{encoded_source_full_path}" end diff --git a/app/workers/bulk_imports/pipeline_batch_worker.rb b/app/workers/bulk_imports/pipeline_batch_worker.rb index 28f4b15557f65b..1485275e6162c4 100644 --- a/app/workers/bulk_imports/pipeline_batch_worker.rb +++ b/app/workers/bulk_imports/pipeline_batch_worker.rb @@ -21,7 +21,10 @@ class PipelineBatchWorker defer_on_database_health_signal(:gitlab_main, [], DEFER_ON_HEALTH_DELAY) do |job_args, schema, tables| batch = ::BulkImports::BatchTracker.find(job_args.first) pipeline_tracker = batch.tracker - pipeline_schema = ::BulkImports::PipelineSchemaInfo.new(pipeline_tracker.pipeline_class, pipeline_tracker.entity) + pipeline_schema = ::BulkImports::PipelineSchemaInfo.new( + pipeline_tracker.pipeline_class, + pipeline_tracker.entity.portable_class + ) if pipeline_schema.db_schema && pipeline_schema.db_table schema = pipeline_schema.db_schema @@ -31,6 +34,10 @@ class PipelineBatchWorker [schema, tables] end + def self.defer_on_database_health_signal? + Feature.enabled?(:bulk_import_deferred_workers) + end + def perform(batch_id) @batch = ::BulkImports::BatchTracker.find(batch_id) diff --git a/app/workers/bulk_imports/pipeline_worker.rb b/app/workers/bulk_imports/pipeline_worker.rb index b3cd1949c7768c..2c1d28b33c5215 100644 --- a/app/workers/bulk_imports/pipeline_worker.rb +++ b/app/workers/bulk_imports/pipeline_worker.rb @@ -25,7 +25,10 @@ class PipelineWorker defer_on_database_health_signal(:gitlab_main, [], DEFER_ON_HEALTH_DELAY) do |job_args, schema, tables| pipeline_tracker = ::BulkImports::Tracker.find(job_args.first) - pipeline_schema = ::BulkImports::PipelineSchemaInfo.new(pipeline_tracker.pipeline_class, pipeline_tracker.entity) + pipeline_schema = ::BulkImports::PipelineSchemaInfo.new( + pipeline_tracker.pipeline_class, + pipeline_tracker.entity.portable_class + ) if pipeline_schema.db_schema && pipeline_schema.db_table schema = pipeline_schema.db_schema @@ -35,6 +38,10 @@ class PipelineWorker [schema, tables] end + def self.defer_on_database_health_signal? + Feature.enabled?(:bulk_import_deferred_workers) + end + # Keep _stage parameter for backwards compatibility. def perform(pipeline_tracker_id, _stage, entity_id) @entity = ::BulkImports::Entity.find(entity_id) diff --git a/config/feature_flags/development/bulk_import_deferred_workers.yml b/config/feature_flags/development/bulk_import_deferred_workers.yml new file mode 100644 index 00000000000000..1b6a022099ccd6 --- /dev/null +++ b/config/feature_flags/development/bulk_import_deferred_workers.yml @@ -0,0 +1,8 @@ +--- +name: bulk_import_deferred_workers +introduced_by_url: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/136137 +rollout_issue_url: https://gitlab.com/gitlab-org/gitlab/-/issues/431032 +milestone: '16.6' +type: development +group: group::import and integrate +default_enabled: false diff --git a/lib/bulk_imports/pipeline_schema_info.rb b/lib/bulk_imports/pipeline_schema_info.rb index c44e9da59a60dd..df35a3569d643d 100644 --- a/lib/bulk_imports/pipeline_schema_info.rb +++ b/lib/bulk_imports/pipeline_schema_info.rb @@ -2,9 +2,9 @@ module BulkImports class PipelineSchemaInfo - def initialize(pipeline_class, entity) - @pipeline = pipeline_class.new(nil) - @portable_class = entity.project? ? Project : Group + def initialize(pipeline_class, portable_class) + @pipeline_class = pipeline_class + @portable_class = portable_class end def db_schema @@ -23,10 +23,10 @@ def db_table private - attr_reader :pipeline, :portable_class + attr_reader :pipeline_class, :portable_class def relation - pipeline.try(:relation) + @relation ||= pipeline_class.try(:relation) end def association diff --git a/spec/lib/bulk_imports/pipeline_schema_info_spec.rb b/spec/lib/bulk_imports/pipeline_schema_info_spec.rb index 18761224fdff31..b11a4147c285f6 100644 --- a/spec/lib/bulk_imports/pipeline_schema_info_spec.rb +++ b/spec/lib/bulk_imports/pipeline_schema_info_spec.rb @@ -3,11 +3,12 @@ require 'spec_helper' RSpec.describe BulkImports::PipelineSchemaInfo, feature_category: :importers do - let_it_be(:entity) { build(:bulk_import_entity, :project_entity) } + let(:entity) { build(:bulk_import_entity, :project_entity) } + let(:tracker) { build(:bulk_import_tracker, entity: entity, pipeline_name: pipeline_name) } - let(:pipeline_class) { BulkImports::Common::Pipelines::LabelsPipeline } + let(:pipeline_name) { BulkImports::Common::Pipelines::LabelsPipeline.to_s } - subject { described_class.new(pipeline_class, entity) } + subject { described_class.new(tracker.pipeline_class, tracker.entity.portable_class) } describe '#db_schema' do context 'when pipeline defines a relation name which is an association' do @@ -17,7 +18,7 @@ end context 'when pipeline does not define a relation name' do - let(:pipeline_class) { BulkImports::Common::Pipelines::BadgesPipeline } + let(:pipeline_name) { BulkImports::Common::Pipelines::EntityFinisher.to_s } it 'returns nil' do expect(subject.db_schema).to eq(nil) @@ -25,7 +26,7 @@ end context 'when pipeline relation name is not an association' do - let(:pipeline_class) { BulkImports::Projects::Pipelines::CommitNotesPipeline } + let(:pipeline_name) { BulkImports::Projects::Pipelines::CommitNotesPipeline.to_s } it 'returns nil' do expect(subject.db_schema).to eq(nil) @@ -41,7 +42,7 @@ end context 'when pipeline does not define a relation name' do - let(:pipeline_class) { BulkImports::Common::Pipelines::BadgesPipeline } + let(:pipeline_name) { BulkImports::Common::Pipelines::EntityFinisher.to_s } it 'returns nil' do expect(subject.db_table).to eq(nil) @@ -49,7 +50,7 @@ end context 'when pipeline relation name is not an association' do - let(:pipeline_class) { BulkImports::Projects::Pipelines::CommitNotesPipeline } + let(:pipeline_name) { BulkImports::Projects::Pipelines::CommitNotesPipeline.to_s } it 'returns nil' do expect(subject.db_table).to eq(nil) diff --git a/spec/models/bulk_imports/entity_spec.rb b/spec/models/bulk_imports/entity_spec.rb index 3e98ba0973e8b0..b822786579b671 100644 --- a/spec/models/bulk_imports/entity_spec.rb +++ b/spec/models/bulk_imports/entity_spec.rb @@ -248,6 +248,24 @@ end end + describe '#portable_class' do + context 'when entity is group' do + it 'returns Group class' do + entity = build(:bulk_import_entity, :group_entity) + + expect(entity.portable_class).to eq(Group) + end + end + + context 'when entity is project' do + it 'returns Project class' do + entity = build(:bulk_import_entity, :project_entity) + + expect(entity.portable_class).to eq(Project) + end + end + end + describe '#export_relations_url_path' do context 'when entity is group' do it 'returns group export relations url' do diff --git a/spec/workers/bulk_imports/pipeline_batch_worker_spec.rb b/spec/workers/bulk_imports/pipeline_batch_worker_spec.rb index 57ccffd2a14cbc..ef9c074e1ffcf8 100644 --- a/spec/workers/bulk_imports/pipeline_batch_worker_spec.rb +++ b/spec/workers/bulk_imports/pipeline_batch_worker_spec.rb @@ -13,6 +13,10 @@ def initialize(context) @context = context end + def self.relation + 'labels' + end + def run @context.tracker.finish! end @@ -211,20 +215,43 @@ def self.file_extraction_pipeline? end end - context 'with stop signal from database health check' do - before do - stub_feature_flags("drop_sidekiq_jobs_#{described_class.name}": false) + before do + stub_feature_flags("drop_sidekiq_jobs_#{described_class.name}": false) + + stop_signal = instance_double("Gitlab::Database::HealthStatus::Signals::Stop", stop?: true) + allow(Gitlab::Database::HealthStatus).to receive(:evaluate).and_return([stop_signal]) + end - stop_signal = instance_double("Gitlab::Database::HealthStatus::Signals::Stop", stop?: true) - allow(Gitlab::Database::HealthStatus).to receive(:evaluate).and_return([stop_signal]) + it 'defers the job by set time' do + expect_next_instance_of(described_class) do |worker| + expect(worker).not_to receive(:perform).with(batch.id) end - it 'defers the job by set time' do + expect(described_class).to receive(:perform_in).with(described_class::DEFER_ON_HEALTH_DELAY, batch.id) + + described_class.perform_async(batch.id) + end + + it 'lazy evaluates schema and tables', :aggregate_failures do + block = described_class.database_health_check_attrs[:block] + + job_args = [batch.id] + + schema, table = block.call([job_args]) + + expect(schema).to eq(:gitlab_main) + expect(table).to eq(['labels']) + end + + context 'when `bulk_import_deferred_workers` feature flag is disabled' do + it 'does not defer job execution' do + stub_feature_flags(bulk_import_deferred_workers: false) + expect_next_instance_of(described_class) do |worker| - expect(worker).not_to receive(:perform).with(batch.id) + expect(worker).to receive(:perform).with(batch.id) end - expect(described_class).to receive(:perform_in).with(described_class::DEFER_ON_HEALTH_DELAY, batch.id) + expect(described_class).not_to receive(:perform_in) described_class.perform_async(batch.id) end diff --git a/spec/workers/bulk_imports/pipeline_worker_spec.rb b/spec/workers/bulk_imports/pipeline_worker_spec.rb index 9ec2bbd1e3215f..873ff0b989b3b8 100644 --- a/spec/workers/bulk_imports/pipeline_worker_spec.rb +++ b/spec/workers/bulk_imports/pipeline_worker_spec.rb @@ -9,6 +9,10 @@ def initialize(_); end def run; end + def self.relation + 'labels' + end + def self.file_extraction_pipeline? false end @@ -163,25 +167,48 @@ def self.file_extraction_pipeline? end end - context 'with stop signal from database health check' do - before do - stub_feature_flags("drop_sidekiq_jobs_#{described_class.name}": false) + before do + stub_feature_flags("drop_sidekiq_jobs_#{described_class.name}": false) - stop_signal = instance_double("Gitlab::Database::HealthStatus::Signals::Stop", stop?: true) - allow(Gitlab::Database::HealthStatus).to receive(:evaluate).and_return([stop_signal]) + stop_signal = instance_double("Gitlab::Database::HealthStatus::Signals::Stop", stop?: true) + allow(Gitlab::Database::HealthStatus).to receive(:evaluate).and_return([stop_signal]) + end + + it 'defers the job by set time' do + expect_next_instance_of(described_class) do |worker| + expect(worker).not_to receive(:perform).with(pipeline_tracker.id, pipeline_tracker.stage, entity.id) end - it 'defers the job by set time' do + expect(described_class).to receive(:perform_in).with( + described_class::DEFER_ON_HEALTH_DELAY, + pipeline_tracker.id, + pipeline_tracker.stage, + entity.id + ) + + described_class.perform_async(pipeline_tracker.id, pipeline_tracker.stage, entity.id) + end + + it 'lazy evaluates schema and tables', :aggregate_failures do + block = described_class.database_health_check_attrs[:block] + + job_args = [pipeline_tracker.id, pipeline_tracker.stage, entity.id] + + schema, table = block.call([job_args]) + + expect(schema).to eq(:gitlab_main) + expect(table).to eq(['labels']) + end + + context 'when `bulk_import_deferred_workers` feature flag is disabled' do + it 'does not defer job execution' do + stub_feature_flags(bulk_import_deferred_workers: false) + expect_next_instance_of(described_class) do |worker| - expect(worker).not_to receive(:perform).with(pipeline_tracker.id, pipeline_tracker.stage, entity.id) + expect(worker).to receive(:perform).with(pipeline_tracker.id, pipeline_tracker.stage, entity.id) end - expect(described_class).to receive(:perform_in).with( - described_class::DEFER_ON_HEALTH_DELAY, - pipeline_tracker.id, - pipeline_tracker.stage, - entity.id - ) + expect(described_class).not_to receive(:perform_in) described_class.perform_async(pipeline_tracker.id, pipeline_tracker.stage, entity.id) end -- GitLab From 9cdcbc05f77913ae19016c19b421a25b31559c0f Mon Sep 17 00:00:00 2001 From: Rodrigo Tomonari Date: Tue, 7 Nov 2023 20:24:58 -0300 Subject: [PATCH 4/7] Change spec description --- spec/lib/gitlab/sidekiq_middleware/skip_jobs_spec.rb | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/spec/lib/gitlab/sidekiq_middleware/skip_jobs_spec.rb b/spec/lib/gitlab/sidekiq_middleware/skip_jobs_spec.rb index e007439618291a..6df77c350e239e 100644 --- a/spec/lib/gitlab/sidekiq_middleware/skip_jobs_spec.rb +++ b/spec/lib/gitlab/sidekiq_middleware/skip_jobs_spec.rb @@ -186,14 +186,14 @@ def self.name end end - context 'with a block' do + context 'when a block is provided' do before do TestWorker.defer_on_database_health_signal(*health_signal_attrs.values) do [:gitlab_ci, [:ci_pipelines]] end end - it 'uses the lazy evaluated schema and tables' do + it 'uses the lazy evaluated schema and tables returned by the block' do expect(Gitlab::Database::HealthStatus::Context).to receive(:new) .with(anything, anything, [:ci_pipelines], :gitlab_ci).and_call_original -- GitLab From a5f6f30669a6ee57067099636b3b351aa2a5f3a2 Mon Sep 17 00:00:00 2001 From: Rodrigo Tomonari Date: Tue, 7 Nov 2023 23:08:59 -0300 Subject: [PATCH 5/7] Fix worker spec --- .../ci/pipeline_success_unlock_artifacts_worker_spec.rb | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/spec/workers/ci/pipeline_success_unlock_artifacts_worker_spec.rb b/spec/workers/ci/pipeline_success_unlock_artifacts_worker_spec.rb index 60a34fdab53554..d5f3c2b92fb095 100644 --- a/spec/workers/ci/pipeline_success_unlock_artifacts_worker_spec.rb +++ b/spec/workers/ci/pipeline_success_unlock_artifacts_worker_spec.rb @@ -75,7 +75,8 @@ expect(described_class.database_health_check_attrs).to eq( gitlab_schema: :gitlab_ci, delay_by: described_class::DEFAULT_DEFER_DELAY, - tables: [:ci_job_artifacts] + tables: [:ci_job_artifacts], + block: nil ) end end -- GitLab From b757e64ca26bf632b8ed7d32a6bf7ff9d17e6515 Mon Sep 17 00:00:00 2001 From: Rodrigo Tomonari Date: Thu, 9 Nov 2023 12:26:07 -0300 Subject: [PATCH 6/7] Uses values_at and check if block responds to call --- lib/gitlab/sidekiq_middleware/skip_jobs.rb | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/lib/gitlab/sidekiq_middleware/skip_jobs.rb b/lib/gitlab/sidekiq_middleware/skip_jobs.rb index ebb3f6782ba498..56b150116a3361 100644 --- a/lib/gitlab/sidekiq_middleware/skip_jobs.rb +++ b/lib/gitlab/sidekiq_middleware/skip_jobs.rb @@ -81,10 +81,11 @@ def defer_job_by_database_health_signal?(job, worker_class) health_check_attrs = worker_class.database_health_check_attrs - tables = health_check_attrs[:tables] - schema = health_check_attrs[:gitlab_schema] + tables, schema = health_check_attrs.values_at(:tables, :gitlab_schema) - schema, tables = health_check_attrs[:block].call(job['args'], schema, tables) if health_check_attrs[:block] + if health_check_attrs[:block].respond_to?(:call) + schema, tables = health_check_attrs[:block].call(job['args'], schema, tables) + end job_base_model = Gitlab::Database.schemas_to_base_models[schema].first -- GitLab From d47443b2e9213a6dbe01cc6a5a31b24b5e8c1b4a Mon Sep 17 00:00:00 2001 From: Rodrigo Tomonari Date: Thu, 9 Nov 2023 14:55:35 -0300 Subject: [PATCH 7/7] Change db schema in the specs --- spec/lib/bulk_imports/pipeline_schema_info_spec.rb | 2 +- spec/workers/bulk_imports/pipeline_batch_worker_spec.rb | 2 +- spec/workers/bulk_imports/pipeline_worker_spec.rb | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/spec/lib/bulk_imports/pipeline_schema_info_spec.rb b/spec/lib/bulk_imports/pipeline_schema_info_spec.rb index b11a4147c285f6..45dd92ca26da99 100644 --- a/spec/lib/bulk_imports/pipeline_schema_info_spec.rb +++ b/spec/lib/bulk_imports/pipeline_schema_info_spec.rb @@ -13,7 +13,7 @@ describe '#db_schema' do context 'when pipeline defines a relation name which is an association' do it 'returns the schema name of the table used by the association' do - expect(subject.db_schema).to eq(:gitlab_main) + expect(subject.db_schema).to eq(:gitlab_main_cell) end end diff --git a/spec/workers/bulk_imports/pipeline_batch_worker_spec.rb b/spec/workers/bulk_imports/pipeline_batch_worker_spec.rb index ef9c074e1ffcf8..c459c17b1bcaea 100644 --- a/spec/workers/bulk_imports/pipeline_batch_worker_spec.rb +++ b/spec/workers/bulk_imports/pipeline_batch_worker_spec.rb @@ -239,7 +239,7 @@ def self.file_extraction_pipeline? schema, table = block.call([job_args]) - expect(schema).to eq(:gitlab_main) + expect(schema).to eq(:gitlab_main_cell) expect(table).to eq(['labels']) end diff --git a/spec/workers/bulk_imports/pipeline_worker_spec.rb b/spec/workers/bulk_imports/pipeline_worker_spec.rb index 873ff0b989b3b8..d99b3e9de732b9 100644 --- a/spec/workers/bulk_imports/pipeline_worker_spec.rb +++ b/spec/workers/bulk_imports/pipeline_worker_spec.rb @@ -196,7 +196,7 @@ def self.file_extraction_pipeline? schema, table = block.call([job_args]) - expect(schema).to eq(:gitlab_main) + expect(schema).to eq(:gitlab_main_cell) expect(table).to eq(['labels']) end -- GitLab