diff --git a/app/helpers/application_settings_helper.rb b/app/helpers/application_settings_helper.rb index a2a850ae05ff36230e5375a5730a8a8ad566094a..655fdf8b8ec443637ecee076eab742595fdda75e 100644 --- a/app/helpers/application_settings_helper.rb +++ b/app/helpers/application_settings_helper.rb @@ -497,6 +497,7 @@ def visible_attributes :pipeline_limit_per_project_user_sha, :invitation_flow_enforcement, :can_create_group, + :bulk_import_concurrent_pipeline_batch_limit, :bulk_import_enabled, :bulk_import_max_download_file_size, :allow_runner_registration_token, diff --git a/app/models/application_setting.rb b/app/models/application_setting.rb index 7942b2b9236b4b68974527b45cb7a72448a9687b..d544842edd09c334c24d157699022049ffd8d19c 100644 --- a/app/models/application_setting.rb +++ b/app/models/application_setting.rb @@ -813,6 +813,10 @@ def self.kroki_formats_attributes allow_nil: false, inclusion: { in: [true, false], message: N_('must be a boolean value') } + validates :bulk_import_concurrent_pipeline_batch_limit, + presence: true, + numericality: { only_integer: true, greater_than: 0 } + validates :allow_runner_registration_token, allow_nil: false, inclusion: { in: [true, false], message: N_('must be a boolean value') } diff --git a/app/models/bulk_imports/batch_tracker.rb b/app/models/bulk_imports/batch_tracker.rb index 8561dfacb6e7e4f9268feef6ba864ac5f54013e5..09f220b96b0b1eebbf0a5bc7a5066be10c96ef6d 100644 --- a/app/models/bulk_imports/batch_tracker.rb +++ b/app/models/bulk_imports/batch_tracker.rb @@ -8,7 +8,17 @@ class BatchTracker < ApplicationRecord validates :batch_number, presence: true, uniqueness: { scope: :tracker_id } + IN_PROGRESS_STATES = %i[created started].freeze + scope :by_last_updated, -> { order(updated_at: :desc) } + scope :in_progress, -> { with_status(IN_PROGRESS_STATES) } + + # rubocop: disable Database/AvoidUsingPluckWithoutLimit -- We should use this method only when scoped to a tracker. + # Batches are self-limiting per tracker based on the amount of data being imported. + def self.pluck_batch_numbers + pluck(:batch_number) + end + # rubocop: enable Database/AvoidUsingPluckWithoutLimit state_machine :status, initial: :created do state :created, value: 0 diff --git a/app/models/bulk_imports/export_status.rb b/app/models/bulk_imports/export_status.rb index 3d820e65d5b192090d48002a72f88cc6eab199fe..9e3815e75699c3f3696edb102ba448135499c843 100644 --- a/app/models/bulk_imports/export_status.rb +++ b/app/models/bulk_imports/export_status.rb @@ -4,6 +4,8 @@ module BulkImports class ExportStatus include Gitlab::Utils::StrongMemoize + CACHE_KEY = 'bulk_imports/export_status/%{entity_id}/%{relation}' + def initialize(pipeline_tracker, relation) @pipeline_tracker = pipeline_tracker @relation = relation @@ -50,11 +52,12 @@ def batch(batch_number) def status strong_memoize(:status) do - status = fetch_status - - next status if status.is_a?(Hash) || status.nil? + # As an optimization, once an export status has finished or failed it will + # be cached, so we do not fetch from the remote source again. + status = status_from_cache + next status if status - status.find { |item| item['relation'] == relation } + status_from_remote rescue BulkImports::NetworkError => e raise BulkImports::RetryPipelineError.new(e.message, 2.seconds) if e.retriable?(pipeline_tracker) @@ -64,8 +67,38 @@ def status end end - def fetch_status - client.get(status_endpoint, relation: relation).parsed_response + def status_from_cache + status = Gitlab::Cache::Import::Caching.read(cache_key) + + Gitlab::Json.parse(status) if status + end + + def status_from_remote + raw_status = client.get(status_endpoint, relation: relation).parsed_response + + parse_status_from_remote(raw_status).tap do |status| + cache_status(status) if cache_status?(status) + end + end + + def parse_status_from_remote(status) + # Non-batched status + return status if status.is_a?(Hash) || status.nil? + + # Batched status + status.find { |item| item['relation'] == relation } + end + + def cache_status?(status) + status.present? && status['status'].in?([Export::FINISHED, Export::FAILED]) + end + + def cache_status(status) + Gitlab::Cache::Import::Caching.write(cache_key, status.to_json) + end + + def cache_key + Kernel.format(CACHE_KEY, entity_id: entity.id, relation: relation) end def status_endpoint diff --git a/app/workers/bulk_imports/finish_batched_pipeline_worker.rb b/app/workers/bulk_imports/finish_batched_pipeline_worker.rb index 980cdb98f329d0ff6c8287b9dbe4400aa9f9e56d..b832f1bf2e5c4635a00e6da0f090a4fbbdca27ae 100644 --- a/app/workers/bulk_imports/finish_batched_pipeline_worker.rb +++ b/app/workers/bulk_imports/finish_batched_pipeline_worker.rb @@ -42,7 +42,7 @@ def re_enqueue end def import_in_progress? - sorted_batches.any? { |b| b.started? || b.created? } + sorted_batches.in_progress.any? end def most_recent_batch_stale? diff --git a/app/workers/bulk_imports/pipeline_worker.rb b/app/workers/bulk_imports/pipeline_worker.rb index bd86970844bad5c967451780b78bc14251469cd5..21d8b75aa93e459c42d95a3f10154c253f75be96 100644 --- a/app/workers/bulk_imports/pipeline_worker.rb +++ b/app/workers/bulk_imports/pipeline_worker.rb @@ -4,9 +4,12 @@ module BulkImports class PipelineWorker include ApplicationWorker include ExclusiveLeaseGuard + include Gitlab::Utils::StrongMemoize FILE_EXTRACTION_PIPELINE_PERFORM_DELAY = 10.seconds + LimitedBatches = Struct.new(:numbers, :final?, keyword_init: true).freeze + DEFER_ON_HEALTH_DELAY = 5.minutes data_consistency :always @@ -52,7 +55,6 @@ def perform(pipeline_tracker_id, _stage, entity_id) try_obtain_lease do if pipeline_tracker.enqueued? || pipeline_tracker.started? logger.info(log_attributes(message: 'Pipeline starting')) - run end end @@ -84,7 +86,8 @@ def run return pipeline_tracker.finish! if export_status.batches_count < 1 - enqueue_batches + enqueue_limited_batches + re_enqueue unless all_batches_enqueued? else log_extra_metadata_on_done(:batched, false) @@ -210,22 +213,60 @@ def time_since_tracker_created Time.zone.now - (pipeline_tracker.created_at || entity.created_at) end - def lease_timeout - 30 + def enqueue_limited_batches + next_batch.numbers.each do |batch_number| + batch = pipeline_tracker.batches.create!(batch_number: batch_number) + + with_context(bulk_import_entity_id: entity.id) do + ::BulkImports::PipelineBatchWorker.perform_async(batch.id) + end + end + + log_extra_metadata_on_done(:tracker_batch_numbers_enqueued, next_batch.numbers) + log_extra_metadata_on_done(:tracker_final_batch_was_enqueued, next_batch.final?) end - def lease_key - "gitlab:bulk_imports:pipeline_worker:#{pipeline_tracker.id}" + def all_batches_enqueued? + next_batch.final? end - def enqueue_batches - 1.upto(export_status.batches_count) do |batch_number| - batch = pipeline_tracker.batches.find_or_create_by!(batch_number: batch_number) # rubocop:disable CodeReuse/ActiveRecord + def next_batch + all_batch_numbers = (1..export_status.batches_count).to_a - with_context(bulk_import_entity_id: entity.id) do - ::BulkImports::PipelineBatchWorker.perform_async(batch.id) - end + created_batch_numbers = pipeline_tracker.batches.pluck_batch_numbers + + remaining_batch_numbers = all_batch_numbers - created_batch_numbers + + if Feature.disabled?(:bulk_import_limit_concurrent_batches, context.portable) + return LimitedBatches.new(numbers: remaining_batch_numbers, final?: true) end + + limit = next_batch_count + + LimitedBatches.new( + numbers: remaining_batch_numbers.first(limit), + final?: remaining_batch_numbers.count <= limit + ) + end + strong_memoize_attr :next_batch + + # Calculate the number of batches, up to `batch_limit`, to process in the + # next round. + def next_batch_count + limit = batch_limit - pipeline_tracker.batches.in_progress.limit(batch_limit).count + [limit, 0].max + end + + def batch_limit + ::Gitlab::CurrentSettings.bulk_import_concurrent_pipeline_batch_limit + end + + def lease_timeout + 30 + end + + def lease_key + "gitlab:bulk_imports:pipeline_worker:#{pipeline_tracker.id}" end end end diff --git a/config/feature_flags/development/bulk_import_limit_concurrent_batches.yml b/config/feature_flags/development/bulk_import_limit_concurrent_batches.yml new file mode 100644 index 0000000000000000000000000000000000000000..4bbd0bd5773da7c595b25313923fbab14407b3eb --- /dev/null +++ b/config/feature_flags/development/bulk_import_limit_concurrent_batches.yml @@ -0,0 +1,8 @@ +--- +name: bulk_import_limit_concurrent_batches +introduced_by_url: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/136018 +rollout_issue_url: https://gitlab.com/gitlab-org/gitlab/-/issues/431561 +milestone: '16.7' +type: development +group: group::import and integrate +default_enabled: false diff --git a/db/migrate/20231108132916_index_batch_tracker_status.rb b/db/migrate/20231108132916_index_batch_tracker_status.rb new file mode 100644 index 0000000000000000000000000000000000000000..099cbae6fc12291b256d4e7e3f42ccdd76a9ee02 --- /dev/null +++ b/db/migrate/20231108132916_index_batch_tracker_status.rb @@ -0,0 +1,17 @@ +# frozen_string_literal: true + +class IndexBatchTrackerStatus < Gitlab::Database::Migration[2.2] + disable_ddl_transaction! + + milestone '16.7' + + INDEX_NAME = 'index_batch_trackers_on_tracker_id_status' + + def up + add_concurrent_index :bulk_import_batch_trackers, [:tracker_id, :status], name: INDEX_NAME + end + + def down + remove_concurrent_index_by_name :bulk_import_batch_trackers, INDEX_NAME + end +end diff --git a/db/migrate/20231108143957_add_concurrent_direct_transfer_batch_limit_to_application_settings.rb b/db/migrate/20231108143957_add_concurrent_direct_transfer_batch_limit_to_application_settings.rb new file mode 100644 index 0000000000000000000000000000000000000000..064385e02fcfe0bfb2cfbfd27ea9cbf3c282cba7 --- /dev/null +++ b/db/migrate/20231108143957_add_concurrent_direct_transfer_batch_limit_to_application_settings.rb @@ -0,0 +1,9 @@ +# frozen_string_literal: true + +class AddConcurrentDirectTransferBatchLimitToApplicationSettings < Gitlab::Database::Migration[2.2] + milestone '16.7' + + def change + add_column :application_settings, :bulk_import_concurrent_pipeline_batch_limit, :smallint, default: 25, null: false + end +end diff --git a/db/schema_migrations/20231108132916 b/db/schema_migrations/20231108132916 new file mode 100644 index 0000000000000000000000000000000000000000..a7c7d98f18c271e1a001eaeb6e947ddc733f9648 --- /dev/null +++ b/db/schema_migrations/20231108132916 @@ -0,0 +1 @@ +4f67f8ebf48cb7ea22e5451c3b548a5f7dc59b0e2b29d51ac73a04860214a25f \ No newline at end of file diff --git a/db/schema_migrations/20231108143957 b/db/schema_migrations/20231108143957 new file mode 100644 index 0000000000000000000000000000000000000000..ec3f916ea2eb18f62905254fe51340add81c2e3f --- /dev/null +++ b/db/schema_migrations/20231108143957 @@ -0,0 +1 @@ +fc18cfa407a2270af8be9de77b5078544e27afb38e4ad87f3b2c06e24f58add0 \ No newline at end of file diff --git a/db/structure.sql b/db/structure.sql index c3c25cfe267defd234f9d3acdb47b9bd0621ee96..8c43d4a6acf017dab19fcf3b10bf1ffeb922cc4e 100644 --- a/db/structure.sql +++ b/db/structure.sql @@ -12276,6 +12276,7 @@ CREATE TABLE application_settings ( update_namespace_name_rate_limit smallint DEFAULT 120 NOT NULL, pre_receive_secret_detection_enabled boolean DEFAULT false NOT NULL, can_create_organization boolean DEFAULT true NOT NULL, + bulk_import_concurrent_pipeline_batch_limit smallint DEFAULT 25 NOT NULL, web_ide_oauth_application_id integer, instance_level_ai_beta_features_enabled boolean DEFAULT false NOT NULL, security_txt_content text, @@ -31873,6 +31874,8 @@ CREATE INDEX index_badges_on_group_id ON badges USING btree (group_id); CREATE INDEX index_badges_on_project_id ON badges USING btree (project_id); +CREATE INDEX index_batch_trackers_on_tracker_id_status ON bulk_import_batch_trackers USING btree (tracker_id, status); + CREATE INDEX index_batched_background_migrations_on_status ON batched_background_migrations USING btree (status); CREATE UNIQUE INDEX index_batched_background_migrations_on_unique_configuration ON batched_background_migrations USING btree (job_class_name, table_name, column_name, job_arguments); diff --git a/doc/api/settings.md b/doc/api/settings.md index 0af3444febdcfacb2e95d104453d4faae2b23089..cf34cf3d65e17d347e691f4c2ac9c79c4c340718 100644 --- a/doc/api/settings.md +++ b/doc/api/settings.md @@ -126,7 +126,8 @@ Example response: "package_registry_allow_anyone_to_pull_option": true, "bulk_import_max_download_file_size": 5120, "project_jobs_api_rate_limit": 600, - "security_txt_content": null + "security_txt_content": null, + "bulk_import_concurrent_pipeline_batch_limit": 25 } ``` @@ -272,7 +273,8 @@ Example response: "package_registry_allow_anyone_to_pull_option": true, "bulk_import_max_download_file_size": 5120, "project_jobs_api_rate_limit": 600, - "security_txt_content": null + "security_txt_content": null, + "bulk_import_concurrent_pipeline_batch_limit": 25 } ``` @@ -621,6 +623,7 @@ listed in the descriptions of the relevant settings. | `valid_runner_registrars` | array of strings | no | List of types which are allowed to register a GitLab Runner. Can be `[]`, `['group']`, `['project']` or `['group', 'project']`. | | `whats_new_variant` | string | no | What's new variant, possible values: `all_tiers`, `current_tier`, and `disabled`. | | `wiki_page_max_content_bytes` | integer | no | Maximum wiki page content size in **bytes**. Default: 52428800 Bytes (50 MB). The minimum value is 1024 bytes. | +| `bulk_import_concurrent_pipeline_batch_limit` | integer | no | Maximum simultaneous Direct Transfer batches to process. | ### Configure inactive project deletion diff --git a/lib/api/settings.rb b/lib/api/settings.rb index 162e24af0994cc10b36b324a298ed41e65fbec22..9de9f19ccd752bc0a0a509c4378c8718f03fd436 100644 --- a/lib/api/settings.rb +++ b/lib/api/settings.rb @@ -212,6 +212,7 @@ def filter_attributes_using_license(attrs) optional :pipeline_limit_per_project_user_sha, type: Integer, desc: "Maximum number of pipeline creation requests allowed per minute per user and commit. Set to 0 for unlimited requests per minute." optional :jira_connect_application_key, type: String, desc: "Application ID of the OAuth application that should be used to authenticate with the GitLab for Jira Cloud app" optional :jira_connect_proxy_url, type: String, desc: "URL of the GitLab instance that should be used as a proxy for the GitLab for Jira Cloud app" + optional :bulk_import_concurrent_pipeline_batch_limit, type: Integer, desc: 'Maximum simultaneous Direct Transfer pipeline batches to process' optional :bulk_import_enabled, type: Boolean, desc: 'Enable migrating GitLab groups and projects by direct transfer' optional :bulk_import_max_download_file, type: Integer, desc: 'Maximum download file size in MB when importing from source GitLab instances by direct transfer' optional :allow_runner_registration_token, type: Boolean, desc: 'Allow registering runners using a registration token' diff --git a/spec/models/application_setting_spec.rb b/spec/models/application_setting_spec.rb index 05e3bffbeaf155988e48e607b8e2d6cd97ae87e7..d16a78be53318ff0c30b64d7eb8b4a10758e6cb4 100644 --- a/spec/models/application_setting_spec.rb +++ b/spec/models/application_setting_spec.rb @@ -26,6 +26,7 @@ it { expect(setting.default_branch_protection_defaults).to eq({}) } it { expect(setting.max_decompressed_archive_size).to eq(25600) } it { expect(setting.decompress_archive_file_timeout).to eq(210) } + it { expect(setting.bulk_import_concurrent_pipeline_batch_limit).to eq(25) } end describe 'validations' do @@ -1351,6 +1352,13 @@ def expect_invalid .with_message("must be a value between 0 and 1") end end + + describe 'bulk_import_concurrent_pipeline_batch_limit' do + it do + is_expected.to validate_numericality_of(:bulk_import_concurrent_pipeline_batch_limit) + .is_greater_than(0) + end + end end context 'restrict creating duplicates' do diff --git a/spec/models/bulk_imports/batch_tracker_spec.rb b/spec/models/bulk_imports/batch_tracker_spec.rb index 336943228c742fd1079294ab5130327dcae64304..1c7cbc0cb8c4cc133056abe161cca992719f3fcf 100644 --- a/spec/models/bulk_imports/batch_tracker_spec.rb +++ b/spec/models/bulk_imports/batch_tracker_spec.rb @@ -13,4 +13,19 @@ it { is_expected.to validate_presence_of(:batch_number) } it { is_expected.to validate_uniqueness_of(:batch_number).scoped_to(:tracker_id) } end + + describe 'scopes' do + describe '.in_progress' do + it 'returns only batches that are in progress' do + created = create(:bulk_import_batch_tracker, :created) + started = create(:bulk_import_batch_tracker, :started) + create(:bulk_import_batch_tracker, :finished) + create(:bulk_import_batch_tracker, :timeout) + create(:bulk_import_batch_tracker, :failed) + create(:bulk_import_batch_tracker, :skipped) + + expect(described_class.in_progress).to contain_exactly(created, started) + end + end + end end diff --git a/spec/models/bulk_imports/export_status_spec.rb b/spec/models/bulk_imports/export_status_spec.rb index e7a037b54be9765462e4c5a17e821658dee3e591..aa3bce785344cfbfeea1b11303ab15010d2305fd 100644 --- a/spec/models/bulk_imports/export_status_spec.rb +++ b/spec/models/bulk_imports/export_status_spec.rb @@ -2,7 +2,7 @@ require 'spec_helper' -RSpec.describe BulkImports::ExportStatus, feature_category: :importers do +RSpec.describe BulkImports::ExportStatus, :clean_gitlab_redis_cache, feature_category: :importers do let_it_be(:relation) { 'labels' } let_it_be(:import) { create(:bulk_import) } let_it_be(:config) { create(:bulk_import_configuration, bulk_import: import) } @@ -282,4 +282,111 @@ end end end + + describe 'caching' do + let(:cached_status) do + subject.send(:status) + subject.send(:status_from_cache) + end + + shared_examples 'does not result in a cached status' do + specify do + expect(cached_status).to be_nil + end + end + + shared_examples 'results in a cached status' do + specify do + expect(cached_status).to include('status' => status) + end + + context 'when something goes wrong during export status fetch' do + before do + allow_next_instance_of(BulkImports::Clients::HTTP) do |client| + allow(client).to receive(:get).and_raise( + BulkImports::NetworkError.new("Unsuccessful response", response: nil) + ) + end + end + + include_examples 'does not result in a cached status' + end + end + + context 'when export status is started' do + let(:status) { BulkImports::Export::STARTED } + + it_behaves_like 'does not result in a cached status' + end + + context 'when export status is failed' do + let(:status) { BulkImports::Export::FAILED } + + it_behaves_like 'results in a cached status' + end + + context 'when export status is finished' do + let(:status) { BulkImports::Export::FINISHED } + + it_behaves_like 'results in a cached status' + end + + context 'when export status is not present' do + let(:status) { nil } + + it_behaves_like 'does not result in a cached status' + end + + context 'when the cache is empty' do + let(:status) { BulkImports::Export::FAILED } + + it 'fetches the status from the remote' do + expect(subject).to receive(:status_from_remote).and_call_original + expect(subject.send(:status)).to include('status' => status) + end + end + + context 'when the cache is not empty' do + let(:status) { BulkImports::Export::FAILED } + + before do + Gitlab::Cache::Import::Caching.write( + described_class.new(tracker, 'labels').send(:cache_key), + { 'status' => 'mock status' }.to_json + ) + end + + it 'does not fetch the status from the remote' do + expect(subject).not_to receive(:status_from_remote) + expect(subject.send(:status)).to eq({ 'status' => 'mock status' }) + end + + context 'with a different entity' do + before do + tracker.entity = create(:bulk_import_entity, bulk_import: import, source_full_path: 'foo') + end + + it 'fetches the status from the remote' do + expect(subject).to receive(:status_from_remote).and_call_original + expect(subject.send(:status)).to include('status' => status) + end + end + + context 'with a different relation' do + let_it_be(:relation) { 'merge_requests' } + + let(:response_double) do + instance_double(HTTParty::Response, parsed_response: [ + { 'relation' => 'labels', 'status' => status }, + { 'relation' => 'merge_requests', 'status' => status } + ]) + end + + it 'fetches the status from the remote' do + expect(subject).to receive(:status_from_remote).and_call_original + expect(subject.send(:status)).to include('status' => status) + end + end + end + end end diff --git a/spec/requests/api/settings_spec.rb b/spec/requests/api/settings_spec.rb index c304ae514a6067a76248bb323fc914f0290f15a7..4e24689c17a13d183c38cc80044dd0eefcf7569b 100644 --- a/spec/requests/api/settings_spec.rb +++ b/spec/requests/api/settings_spec.rb @@ -93,6 +93,7 @@ expect(json_response['default_branch_protection_defaults']).to be_kind_of(Hash) expect(json_response['max_login_attempts']).to be_nil expect(json_response['failed_login_attempts_unlock_period_in_minutes']).to be_nil + expect(json_response['bulk_import_concurrent_pipeline_batch_limit']).to eq(25) end end @@ -202,6 +203,7 @@ jira_connect_proxy_url: 'http://example.com', bulk_import_enabled: false, bulk_import_max_download_file_size: 1, + bulk_import_concurrent_pipeline_batch_limit: 2, allow_runner_registration_token: true, user_defaults_to_private_profile: true, default_syntax_highlighting_theme: 2, @@ -296,6 +298,7 @@ expect(json_response['max_import_remote_file_size']).to be(2) expect(json_response['bulk_import_max_download_file_size']).to be(1) expect(json_response['security_txt_content']).to be(nil) + expect(json_response['bulk_import_concurrent_pipeline_batch_limit']).to be(2) end end diff --git a/spec/workers/bulk_imports/pipeline_worker_spec.rb b/spec/workers/bulk_imports/pipeline_worker_spec.rb index 09f76169a12b7694a4b849470c93cae239ff7518..bda07a956933abb354c0ab5f646f64b2d4f37880 100644 --- a/spec/workers/bulk_imports/pipeline_worker_spec.rb +++ b/spec/workers/bulk_imports/pipeline_worker_spec.rb @@ -48,7 +48,7 @@ def self.abort_on_failure? end end - include_examples 'an idempotent worker' do + it_behaves_like 'an idempotent worker' do let(:job_args) { [pipeline_tracker.id, pipeline_tracker.stage, entity.id] } it 'runs the pipeline and sets tracker to finished' do @@ -516,8 +516,8 @@ def self.relation end end - context 'when export is batched' do - let(:batches_count) { 2 } + context 'when export is batched', :aggregate_failures do + let(:batches_count) { 3 } before do allow_next_instance_of(BulkImports::ExportStatus) do |status| @@ -527,10 +527,30 @@ def self.relation allow(status).to receive(:empty?).and_return(false) allow(status).to receive(:failed?).and_return(false) end + allow(worker).to receive(:log_extra_metadata_on_done).and_call_original end it 'enqueues pipeline batches' do + expect(BulkImports::PipelineBatchWorker).to receive(:perform_async).exactly(3).times + expect(worker).to receive(:log_extra_metadata_on_done).with(:batched, true) + expect(worker).to receive(:log_extra_metadata_on_done).with(:tracker_batch_numbers_enqueued, [1, 2, 3]) + expect(worker).to receive(:log_extra_metadata_on_done).with(:tracker_final_batch_was_enqueued, true) + + worker.perform(pipeline_tracker.id, pipeline_tracker.stage, entity.id) + + pipeline_tracker.reload + + expect(pipeline_tracker.status_name).to eq(:started) + expect(pipeline_tracker.batched).to eq(true) + expect(pipeline_tracker.batches.pluck_batch_numbers).to contain_exactly(1, 2, 3) + expect(described_class.jobs).to be_empty + end + + it 'enqueues only missing pipelines batches' do + create(:bulk_import_batch_tracker, tracker: pipeline_tracker, batch_number: 2) expect(BulkImports::PipelineBatchWorker).to receive(:perform_async).twice + expect(worker).to receive(:log_extra_metadata_on_done).with(:tracker_batch_numbers_enqueued, [1, 3]) + expect(worker).to receive(:log_extra_metadata_on_done).with(:tracker_final_batch_was_enqueued, true) worker.perform(pipeline_tracker.id, pipeline_tracker.stage, entity.id) @@ -538,7 +558,8 @@ def self.relation expect(pipeline_tracker.status_name).to eq(:started) expect(pipeline_tracker.batched).to eq(true) - expect(pipeline_tracker.batches.count).to eq(batches_count) + expect(pipeline_tracker.batches.pluck_batch_numbers).to contain_exactly(1, 2, 3) + expect(described_class.jobs).to be_empty end context 'when batches count is less than 1' do @@ -552,6 +573,127 @@ def self.relation expect(pipeline_tracker.reload.status_name).to eq(:finished) end end + + context 'when pipeline batch enqueuing should be limited' do + using RSpec::Parameterized::TableSyntax + + before do + allow(::Gitlab::CurrentSettings).to receive(:bulk_import_concurrent_pipeline_batch_limit).and_return(2) + end + + it 'only enqueues limited batches and reenqueues itself' do + expect(BulkImports::PipelineBatchWorker).to receive(:perform_async).twice + expect(worker).to receive(:log_extra_metadata_on_done).with(:tracker_batch_numbers_enqueued, [1, 2]) + expect(worker).to receive(:log_extra_metadata_on_done).with(:tracker_final_batch_was_enqueued, false) + + worker.perform(pipeline_tracker.id, pipeline_tracker.stage, entity.id) + + pipeline_tracker.reload + + expect(pipeline_tracker.status_name).to eq(:started) + expect(pipeline_tracker.batched).to eq(true) + expect(pipeline_tracker.batches.pluck_batch_numbers).to contain_exactly(1, 2) + expect(described_class.jobs).to contain_exactly( + hash_including( + 'args' => [pipeline_tracker.id, pipeline_tracker.stage, entity.id], + 'scheduled_at' => be_within(1).of(10.seconds.from_now.to_i) + ) + ) + end + + context 'when there is a batch in progress' do + where(:status) { BulkImports::BatchTracker::IN_PROGRESS_STATES } + + with_them do + before do + create(:bulk_import_batch_tracker, status, batch_number: 1, tracker: pipeline_tracker) + end + + it 'counts the in progress batch against the limit' do + expect(BulkImports::PipelineBatchWorker).to receive(:perform_async).once + expect(worker).to receive(:log_extra_metadata_on_done).with(:tracker_batch_numbers_enqueued, [2]) + expect(worker).to receive(:log_extra_metadata_on_done).with(:tracker_final_batch_was_enqueued, false) + + worker.perform(pipeline_tracker.id, pipeline_tracker.stage, entity.id) + + pipeline_tracker.reload + + expect(pipeline_tracker.status_name).to eq(:started) + expect(pipeline_tracker.batched).to eq(true) + expect(pipeline_tracker.batches.pluck_batch_numbers).to contain_exactly(1, 2) + expect(described_class.jobs).to contain_exactly( + hash_including( + 'args' => [pipeline_tracker.id, pipeline_tracker.stage, entity.id], + 'scheduled_at' => be_within(1).of(10.seconds.from_now.to_i) + ) + ) + end + end + end + + context 'when there is a batch that has finished' do + where(:status) do + all_statuses = BulkImports::BatchTracker.state_machines[:status].states.map(&:name) + all_statuses - BulkImports::BatchTracker::IN_PROGRESS_STATES + end + + with_them do + before do + create(:bulk_import_batch_tracker, status, batch_number: 1, tracker: pipeline_tracker) + end + + it 'does not count the finished batch against the limit' do + expect(BulkImports::PipelineBatchWorker).to receive(:perform_async).twice + expect(worker).to receive(:log_extra_metadata_on_done).with(:tracker_batch_numbers_enqueued, [2, 3]) + expect(worker).to receive(:log_extra_metadata_on_done).with(:tracker_final_batch_was_enqueued, true) + + worker.perform(pipeline_tracker.id, pipeline_tracker.stage, entity.id) + + pipeline_tracker.reload + + expect(pipeline_tracker.batches.pluck_batch_numbers).to contain_exactly(1, 2, 3) + expect(described_class.jobs).to be_empty + end + end + end + + context 'when the feature flag is disabled' do + before do + stub_feature_flags(bulk_import_limit_concurrent_batches: false) + end + + it 'does not limit batches' do + expect(BulkImports::PipelineBatchWorker).to receive(:perform_async).exactly(3).times + expect(worker).to receive(:log_extra_metadata_on_done).with(:tracker_batch_numbers_enqueued, [1, 2, 3]) + expect(worker).to receive(:log_extra_metadata_on_done).with(:tracker_final_batch_was_enqueued, true) + + worker.perform(pipeline_tracker.id, pipeline_tracker.stage, entity.id) + + pipeline_tracker.reload + + expect(pipeline_tracker.status_name).to eq(:started) + expect(pipeline_tracker.batched).to eq(true) + expect(pipeline_tracker.batches.pluck_batch_numbers).to contain_exactly(1, 2, 3) + expect(described_class.jobs).to be_empty + end + + it 'still enqueues only missing pipelines batches' do + create(:bulk_import_batch_tracker, tracker: pipeline_tracker, batch_number: 2) + expect(BulkImports::PipelineBatchWorker).to receive(:perform_async).twice + expect(worker).to receive(:log_extra_metadata_on_done).with(:tracker_batch_numbers_enqueued, [1, 3]) + expect(worker).to receive(:log_extra_metadata_on_done).with(:tracker_final_batch_was_enqueued, true) + + worker.perform(pipeline_tracker.id, pipeline_tracker.stage, entity.id) + + pipeline_tracker.reload + + expect(pipeline_tracker.status_name).to eq(:started) + expect(pipeline_tracker.batched).to eq(true) + expect(pipeline_tracker.batches.pluck_batch_numbers).to contain_exactly(1, 2, 3) + expect(described_class.jobs).to be_empty + end + end + end end end end