From 6823e44dcce0bdd9c5dea4d3cd8ba38e4a28a261 Mon Sep 17 00:00:00 2001 From: Luke Duncalfe Date: Mon, 6 Nov 2023 16:18:01 +1300 Subject: [PATCH 1/4] Limit concurrent BulkImports::PipelineBatchWorker This limits the number of concurrent in-progress `BulkImports::PipelineBatchWorker` workers that will run for a single Direct Transfer migration. This is a performance measure to avoid any one Direct Transfer migration over-saturing resources. https://gitlab.com/gitlab-org/gitlab/-/issues/429863 --- app/helpers/application_settings_helper.rb | 1 + app/models/application_setting.rb | 4 + app/models/bulk_imports/batch_tracker.rb | 7 + app/models/bulk_imports/export_status.rb | 45 ++++- .../finish_batched_pipeline_worker.rb | 2 +- app/workers/bulk_imports/pipeline_worker.rb | 71 +++++--- .../bulk_import_limit_concurrent_batches.yml | 8 + ...231108132916_index_batch_tracker_status.rb | 17 ++ ...fer_batch_limit_to_application_settings.rb | 9 + db/schema_migrations/20231108132916 | 1 + db/schema_migrations/20231108143957 | 1 + db/structure.sql | 3 + doc/api/settings.md | 7 +- lib/api/settings.rb | 1 + spec/models/application_setting_spec.rb | 8 + .../models/bulk_imports/batch_tracker_spec.rb | 15 ++ .../models/bulk_imports/export_status_spec.rb | 109 +++++++++++- spec/requests/api/settings_spec.rb | 3 + .../bulk_imports/pipeline_worker_spec.rb | 159 ++++++++++++++++-- 19 files changed, 429 insertions(+), 42 deletions(-) create mode 100644 config/feature_flags/development/bulk_import_limit_concurrent_batches.yml create mode 100644 db/migrate/20231108132916_index_batch_tracker_status.rb create mode 100644 db/migrate/20231108143957_add_concurrent_direct_transfer_batch_limit_to_application_settings.rb create mode 100644 db/schema_migrations/20231108132916 create mode 100644 db/schema_migrations/20231108143957 diff --git a/app/helpers/application_settings_helper.rb b/app/helpers/application_settings_helper.rb index a2a850ae05ff36..655fdf8b8ec443 100644 --- a/app/helpers/application_settings_helper.rb +++ b/app/helpers/application_settings_helper.rb @@ -497,6 +497,7 @@ def visible_attributes :pipeline_limit_per_project_user_sha, :invitation_flow_enforcement, :can_create_group, + :bulk_import_concurrent_pipeline_batch_limit, :bulk_import_enabled, :bulk_import_max_download_file_size, :allow_runner_registration_token, diff --git a/app/models/application_setting.rb b/app/models/application_setting.rb index 7942b2b9236b4b..d544842edd09c3 100644 --- a/app/models/application_setting.rb +++ b/app/models/application_setting.rb @@ -813,6 +813,10 @@ def self.kroki_formats_attributes allow_nil: false, inclusion: { in: [true, false], message: N_('must be a boolean value') } + validates :bulk_import_concurrent_pipeline_batch_limit, + presence: true, + numericality: { only_integer: true, greater_than: 0 } + validates :allow_runner_registration_token, allow_nil: false, inclusion: { in: [true, false], message: N_('must be a boolean value') } diff --git a/app/models/bulk_imports/batch_tracker.rb b/app/models/bulk_imports/batch_tracker.rb index 8561dfacb6e7e4..f8ef51405a3615 100644 --- a/app/models/bulk_imports/batch_tracker.rb +++ b/app/models/bulk_imports/batch_tracker.rb @@ -8,7 +8,14 @@ class BatchTracker < ApplicationRecord validates :batch_number, presence: true, uniqueness: { scope: :tracker_id } + IN_PROGRESS_STATES = %i[created started].freeze + scope :by_last_updated, -> { order(updated_at: :desc) } + scope :in_progress, -> { with_status(IN_PROGRESS_STATES) } + + def self.pluck_batch_numbers + pluck(:batch_number) + end state_machine :status, initial: :created do state :created, value: 0 diff --git a/app/models/bulk_imports/export_status.rb b/app/models/bulk_imports/export_status.rb index 3d820e65d5b192..9e3815e75699c3 100644 --- a/app/models/bulk_imports/export_status.rb +++ b/app/models/bulk_imports/export_status.rb @@ -4,6 +4,8 @@ module BulkImports class ExportStatus include Gitlab::Utils::StrongMemoize + CACHE_KEY = 'bulk_imports/export_status/%{entity_id}/%{relation}' + def initialize(pipeline_tracker, relation) @pipeline_tracker = pipeline_tracker @relation = relation @@ -50,11 +52,12 @@ def batch(batch_number) def status strong_memoize(:status) do - status = fetch_status - - next status if status.is_a?(Hash) || status.nil? + # As an optimization, once an export status has finished or failed it will + # be cached, so we do not fetch from the remote source again. + status = status_from_cache + next status if status - status.find { |item| item['relation'] == relation } + status_from_remote rescue BulkImports::NetworkError => e raise BulkImports::RetryPipelineError.new(e.message, 2.seconds) if e.retriable?(pipeline_tracker) @@ -64,8 +67,38 @@ def status end end - def fetch_status - client.get(status_endpoint, relation: relation).parsed_response + def status_from_cache + status = Gitlab::Cache::Import::Caching.read(cache_key) + + Gitlab::Json.parse(status) if status + end + + def status_from_remote + raw_status = client.get(status_endpoint, relation: relation).parsed_response + + parse_status_from_remote(raw_status).tap do |status| + cache_status(status) if cache_status?(status) + end + end + + def parse_status_from_remote(status) + # Non-batched status + return status if status.is_a?(Hash) || status.nil? + + # Batched status + status.find { |item| item['relation'] == relation } + end + + def cache_status?(status) + status.present? && status['status'].in?([Export::FINISHED, Export::FAILED]) + end + + def cache_status(status) + Gitlab::Cache::Import::Caching.write(cache_key, status.to_json) + end + + def cache_key + Kernel.format(CACHE_KEY, entity_id: entity.id, relation: relation) end def status_endpoint diff --git a/app/workers/bulk_imports/finish_batched_pipeline_worker.rb b/app/workers/bulk_imports/finish_batched_pipeline_worker.rb index 980cdb98f329d0..b832f1bf2e5c46 100644 --- a/app/workers/bulk_imports/finish_batched_pipeline_worker.rb +++ b/app/workers/bulk_imports/finish_batched_pipeline_worker.rb @@ -42,7 +42,7 @@ def re_enqueue end def import_in_progress? - sorted_batches.any? { |b| b.started? || b.created? } + sorted_batches.in_progress.any? end def most_recent_batch_stale? diff --git a/app/workers/bulk_imports/pipeline_worker.rb b/app/workers/bulk_imports/pipeline_worker.rb index bd86970844bad5..3f17f56cefeeef 100644 --- a/app/workers/bulk_imports/pipeline_worker.rb +++ b/app/workers/bulk_imports/pipeline_worker.rb @@ -3,17 +3,21 @@ module BulkImports class PipelineWorker include ApplicationWorker - include ExclusiveLeaseGuard + include Gitlab::Utils::StrongMemoize FILE_EXTRACTION_PIPELINE_PERFORM_DELAY = 10.seconds + LimitedBatches = Struct.new(:numbers, :final?, keyword_init: true).freeze + DEFER_ON_HEALTH_DELAY = 5.minutes data_consistency :always feature_category :importers sidekiq_options dead: false, retry: 6 worker_has_external_dependencies! - deduplicate :until_executing + # Ensure duplicate pipeline workers run in serial to avoid creating too + # many batches at once. + deduplicate :until_executed, if_deduplicated: :reschedule_once worker_resource_boundary :memory idempotent! @@ -49,13 +53,11 @@ def perform(pipeline_tracker_id, _stage, entity_id) log_extra_metadata_on_done(:pipeline_class, @pipeline_tracker.pipeline_name) - try_obtain_lease do - if pipeline_tracker.enqueued? || pipeline_tracker.started? - logger.info(log_attributes(message: 'Pipeline starting')) + return unless pipeline_tracker.enqueued? || pipeline_tracker.started? - run - end - end + logger.info(log_attributes(message: 'Pipeline starting')) + + run end def perform_failure(pipeline_tracker_id, entity_id, exception) @@ -84,7 +86,8 @@ def run return pipeline_tracker.finish! if export_status.batches_count < 1 - enqueue_batches + enqueue_limited_batches + re_enqueue unless all_batches_enqueued? else log_extra_metadata_on_done(:batched, false) @@ -210,22 +213,52 @@ def time_since_tracker_created Time.zone.now - (pipeline_tracker.created_at || entity.created_at) end - def lease_timeout - 30 + def enqueue_limited_batches + next_batch.numbers.each do |batch_number| + batch = pipeline_tracker.batches.create!(batch_number: batch_number) + + with_context(bulk_import_entity_id: entity.id) do + ::BulkImports::PipelineBatchWorker.perform_async(batch.id) + end + end + + log_extra_metadata_on_done(:tracker_batch_numbers_enqueued, next_batch.numbers) + log_extra_metadata_on_done(:tracker_final_batch_was_enqueued, next_batch.final?) end - def lease_key - "gitlab:bulk_imports:pipeline_worker:#{pipeline_tracker.id}" + def all_batches_enqueued? + next_batch.final? end - def enqueue_batches - 1.upto(export_status.batches_count) do |batch_number| - batch = pipeline_tracker.batches.find_or_create_by!(batch_number: batch_number) # rubocop:disable CodeReuse/ActiveRecord + def next_batch + all_batch_numbers = (1..export_status.batches_count).to_a - with_context(bulk_import_entity_id: entity.id) do - ::BulkImports::PipelineBatchWorker.perform_async(batch.id) - end + created_batch_numbers = pipeline_tracker.batches.pluck_batch_numbers + + remaining_batch_numbers = all_batch_numbers - created_batch_numbers + + if Feature.disabled?(:bulk_import_limit_concurrent_batches, context.portable) + return LimitedBatches.new(numbers: remaining_batch_numbers, final?: true) end + + limit = next_batch_count + + LimitedBatches.new( + numbers: remaining_batch_numbers.first(limit), + final?: remaining_batch_numbers.count <= limit + ) + end + strong_memoize_attr :next_batch + + # Calculate the number of batches, up to `batch_limit`, to process in the + # next round. + def next_batch_count + limit = batch_limit - pipeline_tracker.batches.in_progress.limit(batch_limit).count + [limit, 0].max + end + + def batch_limit + ::Gitlab::CurrentSettings.bulk_import_concurrent_pipeline_batch_limit end end end diff --git a/config/feature_flags/development/bulk_import_limit_concurrent_batches.yml b/config/feature_flags/development/bulk_import_limit_concurrent_batches.yml new file mode 100644 index 00000000000000..4bbd0bd5773da7 --- /dev/null +++ b/config/feature_flags/development/bulk_import_limit_concurrent_batches.yml @@ -0,0 +1,8 @@ +--- +name: bulk_import_limit_concurrent_batches +introduced_by_url: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/136018 +rollout_issue_url: https://gitlab.com/gitlab-org/gitlab/-/issues/431561 +milestone: '16.7' +type: development +group: group::import and integrate +default_enabled: false diff --git a/db/migrate/20231108132916_index_batch_tracker_status.rb b/db/migrate/20231108132916_index_batch_tracker_status.rb new file mode 100644 index 00000000000000..4db444018e8093 --- /dev/null +++ b/db/migrate/20231108132916_index_batch_tracker_status.rb @@ -0,0 +1,17 @@ +# frozen_string_literal: true + +class IndexBatchTrackerStatus < Gitlab::Database::Migration[2.2] + disable_ddl_transaction! + + milestone '16.7' + + INDEX_NAME = 'index_batch_trackers_on_status' + + def up + add_concurrent_index :bulk_import_batch_trackers, :status, name: INDEX_NAME + end + + def down + remove_concurrent_index_by_name :bulk_import_batch_trackers, INDEX_NAME + end +end diff --git a/db/migrate/20231108143957_add_concurrent_direct_transfer_batch_limit_to_application_settings.rb b/db/migrate/20231108143957_add_concurrent_direct_transfer_batch_limit_to_application_settings.rb new file mode 100644 index 00000000000000..064385e02fcfe0 --- /dev/null +++ b/db/migrate/20231108143957_add_concurrent_direct_transfer_batch_limit_to_application_settings.rb @@ -0,0 +1,9 @@ +# frozen_string_literal: true + +class AddConcurrentDirectTransferBatchLimitToApplicationSettings < Gitlab::Database::Migration[2.2] + milestone '16.7' + + def change + add_column :application_settings, :bulk_import_concurrent_pipeline_batch_limit, :smallint, default: 25, null: false + end +end diff --git a/db/schema_migrations/20231108132916 b/db/schema_migrations/20231108132916 new file mode 100644 index 00000000000000..a7c7d98f18c271 --- /dev/null +++ b/db/schema_migrations/20231108132916 @@ -0,0 +1 @@ +4f67f8ebf48cb7ea22e5451c3b548a5f7dc59b0e2b29d51ac73a04860214a25f \ No newline at end of file diff --git a/db/schema_migrations/20231108143957 b/db/schema_migrations/20231108143957 new file mode 100644 index 00000000000000..ec3f916ea2eb18 --- /dev/null +++ b/db/schema_migrations/20231108143957 @@ -0,0 +1 @@ +fc18cfa407a2270af8be9de77b5078544e27afb38e4ad87f3b2c06e24f58add0 \ No newline at end of file diff --git a/db/structure.sql b/db/structure.sql index c3c25cfe267def..28ff694611a962 100644 --- a/db/structure.sql +++ b/db/structure.sql @@ -12276,6 +12276,7 @@ CREATE TABLE application_settings ( update_namespace_name_rate_limit smallint DEFAULT 120 NOT NULL, pre_receive_secret_detection_enabled boolean DEFAULT false NOT NULL, can_create_organization boolean DEFAULT true NOT NULL, + bulk_import_concurrent_pipeline_batch_limit smallint DEFAULT 25 NOT NULL, web_ide_oauth_application_id integer, instance_level_ai_beta_features_enabled boolean DEFAULT false NOT NULL, security_txt_content text, @@ -31873,6 +31874,8 @@ CREATE INDEX index_badges_on_group_id ON badges USING btree (group_id); CREATE INDEX index_badges_on_project_id ON badges USING btree (project_id); +CREATE INDEX index_batch_trackers_on_status ON bulk_import_batch_trackers USING btree (status); + CREATE INDEX index_batched_background_migrations_on_status ON batched_background_migrations USING btree (status); CREATE UNIQUE INDEX index_batched_background_migrations_on_unique_configuration ON batched_background_migrations USING btree (job_class_name, table_name, column_name, job_arguments); diff --git a/doc/api/settings.md b/doc/api/settings.md index 0af3444febdcfa..cf34cf3d65e17d 100644 --- a/doc/api/settings.md +++ b/doc/api/settings.md @@ -126,7 +126,8 @@ Example response: "package_registry_allow_anyone_to_pull_option": true, "bulk_import_max_download_file_size": 5120, "project_jobs_api_rate_limit": 600, - "security_txt_content": null + "security_txt_content": null, + "bulk_import_concurrent_pipeline_batch_limit": 25 } ``` @@ -272,7 +273,8 @@ Example response: "package_registry_allow_anyone_to_pull_option": true, "bulk_import_max_download_file_size": 5120, "project_jobs_api_rate_limit": 600, - "security_txt_content": null + "security_txt_content": null, + "bulk_import_concurrent_pipeline_batch_limit": 25 } ``` @@ -621,6 +623,7 @@ listed in the descriptions of the relevant settings. | `valid_runner_registrars` | array of strings | no | List of types which are allowed to register a GitLab Runner. Can be `[]`, `['group']`, `['project']` or `['group', 'project']`. | | `whats_new_variant` | string | no | What's new variant, possible values: `all_tiers`, `current_tier`, and `disabled`. | | `wiki_page_max_content_bytes` | integer | no | Maximum wiki page content size in **bytes**. Default: 52428800 Bytes (50 MB). The minimum value is 1024 bytes. | +| `bulk_import_concurrent_pipeline_batch_limit` | integer | no | Maximum simultaneous Direct Transfer batches to process. | ### Configure inactive project deletion diff --git a/lib/api/settings.rb b/lib/api/settings.rb index 162e24af0994cc..9de9f19ccd752b 100644 --- a/lib/api/settings.rb +++ b/lib/api/settings.rb @@ -212,6 +212,7 @@ def filter_attributes_using_license(attrs) optional :pipeline_limit_per_project_user_sha, type: Integer, desc: "Maximum number of pipeline creation requests allowed per minute per user and commit. Set to 0 for unlimited requests per minute." optional :jira_connect_application_key, type: String, desc: "Application ID of the OAuth application that should be used to authenticate with the GitLab for Jira Cloud app" optional :jira_connect_proxy_url, type: String, desc: "URL of the GitLab instance that should be used as a proxy for the GitLab for Jira Cloud app" + optional :bulk_import_concurrent_pipeline_batch_limit, type: Integer, desc: 'Maximum simultaneous Direct Transfer pipeline batches to process' optional :bulk_import_enabled, type: Boolean, desc: 'Enable migrating GitLab groups and projects by direct transfer' optional :bulk_import_max_download_file, type: Integer, desc: 'Maximum download file size in MB when importing from source GitLab instances by direct transfer' optional :allow_runner_registration_token, type: Boolean, desc: 'Allow registering runners using a registration token' diff --git a/spec/models/application_setting_spec.rb b/spec/models/application_setting_spec.rb index 05e3bffbeaf155..d16a78be53318f 100644 --- a/spec/models/application_setting_spec.rb +++ b/spec/models/application_setting_spec.rb @@ -26,6 +26,7 @@ it { expect(setting.default_branch_protection_defaults).to eq({}) } it { expect(setting.max_decompressed_archive_size).to eq(25600) } it { expect(setting.decompress_archive_file_timeout).to eq(210) } + it { expect(setting.bulk_import_concurrent_pipeline_batch_limit).to eq(25) } end describe 'validations' do @@ -1351,6 +1352,13 @@ def expect_invalid .with_message("must be a value between 0 and 1") end end + + describe 'bulk_import_concurrent_pipeline_batch_limit' do + it do + is_expected.to validate_numericality_of(:bulk_import_concurrent_pipeline_batch_limit) + .is_greater_than(0) + end + end end context 'restrict creating duplicates' do diff --git a/spec/models/bulk_imports/batch_tracker_spec.rb b/spec/models/bulk_imports/batch_tracker_spec.rb index 336943228c742f..1c7cbc0cb8c4cc 100644 --- a/spec/models/bulk_imports/batch_tracker_spec.rb +++ b/spec/models/bulk_imports/batch_tracker_spec.rb @@ -13,4 +13,19 @@ it { is_expected.to validate_presence_of(:batch_number) } it { is_expected.to validate_uniqueness_of(:batch_number).scoped_to(:tracker_id) } end + + describe 'scopes' do + describe '.in_progress' do + it 'returns only batches that are in progress' do + created = create(:bulk_import_batch_tracker, :created) + started = create(:bulk_import_batch_tracker, :started) + create(:bulk_import_batch_tracker, :finished) + create(:bulk_import_batch_tracker, :timeout) + create(:bulk_import_batch_tracker, :failed) + create(:bulk_import_batch_tracker, :skipped) + + expect(described_class.in_progress).to contain_exactly(created, started) + end + end + end end diff --git a/spec/models/bulk_imports/export_status_spec.rb b/spec/models/bulk_imports/export_status_spec.rb index e7a037b54be976..aa3bce785344cf 100644 --- a/spec/models/bulk_imports/export_status_spec.rb +++ b/spec/models/bulk_imports/export_status_spec.rb @@ -2,7 +2,7 @@ require 'spec_helper' -RSpec.describe BulkImports::ExportStatus, feature_category: :importers do +RSpec.describe BulkImports::ExportStatus, :clean_gitlab_redis_cache, feature_category: :importers do let_it_be(:relation) { 'labels' } let_it_be(:import) { create(:bulk_import) } let_it_be(:config) { create(:bulk_import_configuration, bulk_import: import) } @@ -282,4 +282,111 @@ end end end + + describe 'caching' do + let(:cached_status) do + subject.send(:status) + subject.send(:status_from_cache) + end + + shared_examples 'does not result in a cached status' do + specify do + expect(cached_status).to be_nil + end + end + + shared_examples 'results in a cached status' do + specify do + expect(cached_status).to include('status' => status) + end + + context 'when something goes wrong during export status fetch' do + before do + allow_next_instance_of(BulkImports::Clients::HTTP) do |client| + allow(client).to receive(:get).and_raise( + BulkImports::NetworkError.new("Unsuccessful response", response: nil) + ) + end + end + + include_examples 'does not result in a cached status' + end + end + + context 'when export status is started' do + let(:status) { BulkImports::Export::STARTED } + + it_behaves_like 'does not result in a cached status' + end + + context 'when export status is failed' do + let(:status) { BulkImports::Export::FAILED } + + it_behaves_like 'results in a cached status' + end + + context 'when export status is finished' do + let(:status) { BulkImports::Export::FINISHED } + + it_behaves_like 'results in a cached status' + end + + context 'when export status is not present' do + let(:status) { nil } + + it_behaves_like 'does not result in a cached status' + end + + context 'when the cache is empty' do + let(:status) { BulkImports::Export::FAILED } + + it 'fetches the status from the remote' do + expect(subject).to receive(:status_from_remote).and_call_original + expect(subject.send(:status)).to include('status' => status) + end + end + + context 'when the cache is not empty' do + let(:status) { BulkImports::Export::FAILED } + + before do + Gitlab::Cache::Import::Caching.write( + described_class.new(tracker, 'labels').send(:cache_key), + { 'status' => 'mock status' }.to_json + ) + end + + it 'does not fetch the status from the remote' do + expect(subject).not_to receive(:status_from_remote) + expect(subject.send(:status)).to eq({ 'status' => 'mock status' }) + end + + context 'with a different entity' do + before do + tracker.entity = create(:bulk_import_entity, bulk_import: import, source_full_path: 'foo') + end + + it 'fetches the status from the remote' do + expect(subject).to receive(:status_from_remote).and_call_original + expect(subject.send(:status)).to include('status' => status) + end + end + + context 'with a different relation' do + let_it_be(:relation) { 'merge_requests' } + + let(:response_double) do + instance_double(HTTParty::Response, parsed_response: [ + { 'relation' => 'labels', 'status' => status }, + { 'relation' => 'merge_requests', 'status' => status } + ]) + end + + it 'fetches the status from the remote' do + expect(subject).to receive(:status_from_remote).and_call_original + expect(subject.send(:status)).to include('status' => status) + end + end + end + end end diff --git a/spec/requests/api/settings_spec.rb b/spec/requests/api/settings_spec.rb index c304ae514a6067..4e24689c17a13d 100644 --- a/spec/requests/api/settings_spec.rb +++ b/spec/requests/api/settings_spec.rb @@ -93,6 +93,7 @@ expect(json_response['default_branch_protection_defaults']).to be_kind_of(Hash) expect(json_response['max_login_attempts']).to be_nil expect(json_response['failed_login_attempts_unlock_period_in_minutes']).to be_nil + expect(json_response['bulk_import_concurrent_pipeline_batch_limit']).to eq(25) end end @@ -202,6 +203,7 @@ jira_connect_proxy_url: 'http://example.com', bulk_import_enabled: false, bulk_import_max_download_file_size: 1, + bulk_import_concurrent_pipeline_batch_limit: 2, allow_runner_registration_token: true, user_defaults_to_private_profile: true, default_syntax_highlighting_theme: 2, @@ -296,6 +298,7 @@ expect(json_response['max_import_remote_file_size']).to be(2) expect(json_response['bulk_import_max_download_file_size']).to be(1) expect(json_response['security_txt_content']).to be(nil) + expect(json_response['bulk_import_concurrent_pipeline_batch_limit']).to be(2) end end diff --git a/spec/workers/bulk_imports/pipeline_worker_spec.rb b/spec/workers/bulk_imports/pipeline_worker_spec.rb index 09f76169a12b76..f39285f598e771 100644 --- a/spec/workers/bulk_imports/pipeline_worker_spec.rb +++ b/spec/workers/bulk_imports/pipeline_worker_spec.rb @@ -48,7 +48,7 @@ def self.abort_on_failure? end end - include_examples 'an idempotent worker' do + it_behaves_like 'an idempotent worker' do let(:job_args) { [pipeline_tracker.id, pipeline_tracker.stage, entity.id] } it 'runs the pipeline and sets tracker to finished' do @@ -88,15 +88,6 @@ def self.abort_on_failure? expect(pipeline_tracker.jid).to eq('jid') end - context 'when exclusive lease cannot be obtained' do - it 'does not run the pipeline' do - expect(worker).to receive(:try_obtain_lease).and_return(false) - expect(worker).not_to receive(:run) - - worker.perform(pipeline_tracker.id, pipeline_tracker.stage, entity.id) - end - end - describe '.sidekiq_retries_exhausted' do it 'logs and sets status as failed' do job = { 'args' => [pipeline_tracker.id, pipeline_tracker.stage, entity.id] } @@ -516,8 +507,8 @@ def self.relation end end - context 'when export is batched' do - let(:batches_count) { 2 } + context 'when export is batched', :aggregate_failures do + let(:batches_count) { 3 } before do allow_next_instance_of(BulkImports::ExportStatus) do |status| @@ -527,10 +518,30 @@ def self.relation allow(status).to receive(:empty?).and_return(false) allow(status).to receive(:failed?).and_return(false) end + allow(worker).to receive(:log_extra_metadata_on_done).and_call_original end it 'enqueues pipeline batches' do + expect(BulkImports::PipelineBatchWorker).to receive(:perform_async).exactly(3).times + expect(worker).to receive(:log_extra_metadata_on_done).with(:batched, true) + expect(worker).to receive(:log_extra_metadata_on_done).with(:tracker_batch_numbers_enqueued, [1, 2, 3]) + expect(worker).to receive(:log_extra_metadata_on_done).with(:tracker_final_batch_was_enqueued, true) + + worker.perform(pipeline_tracker.id, pipeline_tracker.stage, entity.id) + + pipeline_tracker.reload + + expect(pipeline_tracker.status_name).to eq(:started) + expect(pipeline_tracker.batched).to eq(true) + expect(pipeline_tracker.batches.pluck_batch_numbers).to contain_exactly(1, 2, 3) + expect(described_class.jobs).to be_empty + end + + it 'enqueues only missing pipelines batches' do + create(:bulk_import_batch_tracker, tracker: pipeline_tracker, batch_number: 2) expect(BulkImports::PipelineBatchWorker).to receive(:perform_async).twice + expect(worker).to receive(:log_extra_metadata_on_done).with(:tracker_batch_numbers_enqueued, [1, 3]) + expect(worker).to receive(:log_extra_metadata_on_done).with(:tracker_final_batch_was_enqueued, true) worker.perform(pipeline_tracker.id, pipeline_tracker.stage, entity.id) @@ -538,7 +549,8 @@ def self.relation expect(pipeline_tracker.status_name).to eq(:started) expect(pipeline_tracker.batched).to eq(true) - expect(pipeline_tracker.batches.count).to eq(batches_count) + expect(pipeline_tracker.batches.pluck_batch_numbers).to contain_exactly(1, 2, 3) + expect(described_class.jobs).to be_empty end context 'when batches count is less than 1' do @@ -552,6 +564,127 @@ def self.relation expect(pipeline_tracker.reload.status_name).to eq(:finished) end end + + context 'when pipeline batch enqueuing should be limited' do + using RSpec::Parameterized::TableSyntax + + before do + allow(::Gitlab::CurrentSettings).to receive(:bulk_import_concurrent_pipeline_batch_limit).and_return(2) + end + + it 'only enqueues limited batches and reenqueues itself' do + expect(BulkImports::PipelineBatchWorker).to receive(:perform_async).twice + expect(worker).to receive(:log_extra_metadata_on_done).with(:tracker_batch_numbers_enqueued, [1, 2]) + expect(worker).to receive(:log_extra_metadata_on_done).with(:tracker_final_batch_was_enqueued, false) + + worker.perform(pipeline_tracker.id, pipeline_tracker.stage, entity.id) + + pipeline_tracker.reload + + expect(pipeline_tracker.status_name).to eq(:started) + expect(pipeline_tracker.batched).to eq(true) + expect(pipeline_tracker.batches.pluck_batch_numbers).to contain_exactly(1, 2) + expect(described_class.jobs).to contain_exactly( + hash_including( + 'args' => [pipeline_tracker.id, pipeline_tracker.stage, entity.id], + 'scheduled_at' => be_within(1).of(10.seconds.from_now.to_i) + ) + ) + end + + context 'when there is a batch in progress' do + where(:status) { BulkImports::BatchTracker::IN_PROGRESS_STATES } + + with_them do + before do + create(:bulk_import_batch_tracker, status, batch_number: 1, tracker: pipeline_tracker) + end + + it 'counts the in progress batch against the limit' do + expect(BulkImports::PipelineBatchWorker).to receive(:perform_async).once + expect(worker).to receive(:log_extra_metadata_on_done).with(:tracker_batch_numbers_enqueued, [2]) + expect(worker).to receive(:log_extra_metadata_on_done).with(:tracker_final_batch_was_enqueued, false) + + worker.perform(pipeline_tracker.id, pipeline_tracker.stage, entity.id) + + pipeline_tracker.reload + + expect(pipeline_tracker.status_name).to eq(:started) + expect(pipeline_tracker.batched).to eq(true) + expect(pipeline_tracker.batches.pluck_batch_numbers).to contain_exactly(1, 2) + expect(described_class.jobs).to contain_exactly( + hash_including( + 'args' => [pipeline_tracker.id, pipeline_tracker.stage, entity.id], + 'scheduled_at' => be_within(1).of(10.seconds.from_now.to_i) + ) + ) + end + end + end + + context 'when there is a batch that has finished' do + where(:status) do + all_statuses = BulkImports::BatchTracker.state_machines[:status].states.map(&:name) + all_statuses - BulkImports::BatchTracker::IN_PROGRESS_STATES + end + + with_them do + before do + create(:bulk_import_batch_tracker, status, batch_number: 1, tracker: pipeline_tracker) + end + + it 'does not count the finished batch against the limit' do + expect(BulkImports::PipelineBatchWorker).to receive(:perform_async).twice + expect(worker).to receive(:log_extra_metadata_on_done).with(:tracker_batch_numbers_enqueued, [2, 3]) + expect(worker).to receive(:log_extra_metadata_on_done).with(:tracker_final_batch_was_enqueued, true) + + worker.perform(pipeline_tracker.id, pipeline_tracker.stage, entity.id) + + pipeline_tracker.reload + + expect(pipeline_tracker.batches.pluck_batch_numbers).to contain_exactly(1, 2, 3) + expect(described_class.jobs).to be_empty + end + end + end + + context 'when the feature flag is disabled' do + before do + stub_feature_flags(bulk_import_limit_concurrent_batches: false) + end + + it 'does not limit batches' do + expect(BulkImports::PipelineBatchWorker).to receive(:perform_async).exactly(3).times + expect(worker).to receive(:log_extra_metadata_on_done).with(:tracker_batch_numbers_enqueued, [1, 2, 3]) + expect(worker).to receive(:log_extra_metadata_on_done).with(:tracker_final_batch_was_enqueued, true) + + worker.perform(pipeline_tracker.id, pipeline_tracker.stage, entity.id) + + pipeline_tracker.reload + + expect(pipeline_tracker.status_name).to eq(:started) + expect(pipeline_tracker.batched).to eq(true) + expect(pipeline_tracker.batches.pluck_batch_numbers).to contain_exactly(1, 2, 3) + expect(described_class.jobs).to be_empty + end + + it 'still enqueues only missing pipelines batches' do + create(:bulk_import_batch_tracker, tracker: pipeline_tracker, batch_number: 2) + expect(BulkImports::PipelineBatchWorker).to receive(:perform_async).twice + expect(worker).to receive(:log_extra_metadata_on_done).with(:tracker_batch_numbers_enqueued, [1, 3]) + expect(worker).to receive(:log_extra_metadata_on_done).with(:tracker_final_batch_was_enqueued, true) + + worker.perform(pipeline_tracker.id, pipeline_tracker.stage, entity.id) + + pipeline_tracker.reload + + expect(pipeline_tracker.status_name).to eq(:started) + expect(pipeline_tracker.batched).to eq(true) + expect(pipeline_tracker.batches.pluck_batch_numbers).to contain_exactly(1, 2, 3) + expect(described_class.jobs).to be_empty + end + end + end end end end -- GitLab From 6967ed734a9237e61548dc6c9a79f8471a976265 Mon Sep 17 00:00:00 2001 From: Luke Duncalfe Date: Mon, 11 Dec 2023 13:13:11 +1300 Subject: [PATCH 2/4] Add DB reviewer feedback --- db/migrate/20231108132916_index_batch_tracker_status.rb | 4 ++-- db/structure.sql | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/db/migrate/20231108132916_index_batch_tracker_status.rb b/db/migrate/20231108132916_index_batch_tracker_status.rb index 4db444018e8093..099cbae6fc1229 100644 --- a/db/migrate/20231108132916_index_batch_tracker_status.rb +++ b/db/migrate/20231108132916_index_batch_tracker_status.rb @@ -5,10 +5,10 @@ class IndexBatchTrackerStatus < Gitlab::Database::Migration[2.2] milestone '16.7' - INDEX_NAME = 'index_batch_trackers_on_status' + INDEX_NAME = 'index_batch_trackers_on_tracker_id_status' def up - add_concurrent_index :bulk_import_batch_trackers, :status, name: INDEX_NAME + add_concurrent_index :bulk_import_batch_trackers, [:tracker_id, :status], name: INDEX_NAME end def down diff --git a/db/structure.sql b/db/structure.sql index 28ff694611a962..8c43d4a6acf017 100644 --- a/db/structure.sql +++ b/db/structure.sql @@ -31874,7 +31874,7 @@ CREATE INDEX index_badges_on_group_id ON badges USING btree (group_id); CREATE INDEX index_badges_on_project_id ON badges USING btree (project_id); -CREATE INDEX index_batch_trackers_on_status ON bulk_import_batch_trackers USING btree (status); +CREATE INDEX index_batch_trackers_on_tracker_id_status ON bulk_import_batch_trackers USING btree (tracker_id, status); CREATE INDEX index_batched_background_migrations_on_status ON batched_background_migrations USING btree (status); -- GitLab From 8c8cccfb6ca92241de9899eb5d18c46e3259952e Mon Sep 17 00:00:00 2001 From: Luke Duncalfe Date: Mon, 11 Dec 2023 13:39:01 +1300 Subject: [PATCH 3/4] Add backend reviewer feedback --- app/workers/bulk_imports/pipeline_worker.rb | 24 ++++++++++++------- .../bulk_imports/pipeline_worker_spec.rb | 9 +++++++ 2 files changed, 25 insertions(+), 8 deletions(-) diff --git a/app/workers/bulk_imports/pipeline_worker.rb b/app/workers/bulk_imports/pipeline_worker.rb index 3f17f56cefeeef..21d8b75aa93e45 100644 --- a/app/workers/bulk_imports/pipeline_worker.rb +++ b/app/workers/bulk_imports/pipeline_worker.rb @@ -3,6 +3,7 @@ module BulkImports class PipelineWorker include ApplicationWorker + include ExclusiveLeaseGuard include Gitlab::Utils::StrongMemoize FILE_EXTRACTION_PIPELINE_PERFORM_DELAY = 10.seconds @@ -15,9 +16,7 @@ class PipelineWorker feature_category :importers sidekiq_options dead: false, retry: 6 worker_has_external_dependencies! - # Ensure duplicate pipeline workers run in serial to avoid creating too - # many batches at once. - deduplicate :until_executed, if_deduplicated: :reschedule_once + deduplicate :until_executing worker_resource_boundary :memory idempotent! @@ -53,11 +52,12 @@ def perform(pipeline_tracker_id, _stage, entity_id) log_extra_metadata_on_done(:pipeline_class, @pipeline_tracker.pipeline_name) - return unless pipeline_tracker.enqueued? || pipeline_tracker.started? - - logger.info(log_attributes(message: 'Pipeline starting')) - - run + try_obtain_lease do + if pipeline_tracker.enqueued? || pipeline_tracker.started? + logger.info(log_attributes(message: 'Pipeline starting')) + run + end + end end def perform_failure(pipeline_tracker_id, entity_id, exception) @@ -260,5 +260,13 @@ def next_batch_count def batch_limit ::Gitlab::CurrentSettings.bulk_import_concurrent_pipeline_batch_limit end + + def lease_timeout + 30 + end + + def lease_key + "gitlab:bulk_imports:pipeline_worker:#{pipeline_tracker.id}" + end end end diff --git a/spec/workers/bulk_imports/pipeline_worker_spec.rb b/spec/workers/bulk_imports/pipeline_worker_spec.rb index f39285f598e771..bda07a956933ab 100644 --- a/spec/workers/bulk_imports/pipeline_worker_spec.rb +++ b/spec/workers/bulk_imports/pipeline_worker_spec.rb @@ -88,6 +88,15 @@ def self.abort_on_failure? expect(pipeline_tracker.jid).to eq('jid') end + context 'when exclusive lease cannot be obtained' do + it 'does not run the pipeline' do + expect(worker).to receive(:try_obtain_lease).and_return(false) + expect(worker).not_to receive(:run) + + worker.perform(pipeline_tracker.id, pipeline_tracker.stage, entity.id) + end + end + describe '.sidekiq_retries_exhausted' do it 'logs and sets status as failed' do job = { 'args' => [pipeline_tracker.id, pipeline_tracker.stage, entity.id] } -- GitLab From 1fd385e9e5af054dc249e0583ac29ac65780470f Mon Sep 17 00:00:00 2001 From: Luke Duncalfe Date: Tue, 12 Dec 2023 14:15:25 +1300 Subject: [PATCH 4/4] Disable Database/AvoidUsingPluckWithoutLimit cop --- app/models/bulk_imports/batch_tracker.rb | 3 +++ 1 file changed, 3 insertions(+) diff --git a/app/models/bulk_imports/batch_tracker.rb b/app/models/bulk_imports/batch_tracker.rb index f8ef51405a3615..09f220b96b0b1e 100644 --- a/app/models/bulk_imports/batch_tracker.rb +++ b/app/models/bulk_imports/batch_tracker.rb @@ -13,9 +13,12 @@ class BatchTracker < ApplicationRecord scope :by_last_updated, -> { order(updated_at: :desc) } scope :in_progress, -> { with_status(IN_PROGRESS_STATES) } + # rubocop: disable Database/AvoidUsingPluckWithoutLimit -- We should use this method only when scoped to a tracker. + # Batches are self-limiting per tracker based on the amount of data being imported. def self.pluck_batch_numbers pluck(:batch_number) end + # rubocop: enable Database/AvoidUsingPluckWithoutLimit state_machine :status, initial: :created do state :created, value: 0 -- GitLab