From a8d92c944111e97c12d131f98564dfa94d739ef7 Mon Sep 17 00:00:00 2001 From: Kassio Borges Date: Tue, 23 Feb 2021 15:17:55 +0000 Subject: [PATCH 1/6] BulkImports: Run migration in stages BulkImport background jobs now runs in stages. Each stage run in a serial mode (one after other) while the pipelines of a stage runs in parallel to each other. To keep track of the jobs the `BulkImports::EntityPipelineStatus` model was created. --- app/models/bulk_imports/entity.rb | 4 ++ .../bulk_imports/entity_pipeline_status.rb | 40 ++++++++++++ app/workers/all_queues.yml | 16 +++++ app/workers/bulk_imports/pipeline_worker.rb | 64 +++++++++++++++++++ app/workers/bulk_imports/stage_worker.rb | 63 ++++++++++++++++++ config/sidekiq_queues.yml | 4 ++ ...620_create_bulk_import_entity_pipelines.rb | 40 ++++++++++++ db/schema_migrations/20210223141620 | 1 + db/structure.sql | 30 +++++++++ .../pipelines/epic_award_emoji_pipeline.rb | 3 +- .../groups/pipelines/epic_events_pipeline.rb | 5 +- .../bulk_imports/importers/group_importer.rb | 10 +-- .../groups/pipelines/entity_finisher.rb | 28 ++++++++ lib/bulk_imports/importers/group_importer.rb | 44 ++++++++++--- lib/bulk_imports/pipeline.rb | 5 +- lib/bulk_imports/pipeline/runner.rb | 3 + 16 files changed, 342 insertions(+), 18 deletions(-) create mode 100644 app/models/bulk_imports/entity_pipeline_status.rb create mode 100644 app/workers/bulk_imports/pipeline_worker.rb create mode 100644 app/workers/bulk_imports/stage_worker.rb create mode 100644 db/migrate/20210223141620_create_bulk_import_entity_pipelines.rb create mode 100644 db/schema_migrations/20210223141620 create mode 100644 lib/bulk_imports/groups/pipelines/entity_finisher.rb diff --git a/app/models/bulk_imports/entity.rb b/app/models/bulk_imports/entity.rb index ae1e36938092c1..cf49e36d8b1979 100644 --- a/app/models/bulk_imports/entity.rb +++ b/app/models/bulk_imports/entity.rb @@ -35,6 +35,10 @@ class BulkImports::Entity < ApplicationRecord inverse_of: :entity, foreign_key: :bulk_import_entity_id + has_many :pipeline_statuses, + class_name: 'BulkImports::EntityPipelineStatus', + foreign_key: :bulk_import_entity_id + validates :project, absence: true, if: :group validates :group, absence: true, if: :project validates :source_type, :source_full_path, :destination_name, presence: true diff --git a/app/models/bulk_imports/entity_pipeline_status.rb b/app/models/bulk_imports/entity_pipeline_status.rb new file mode 100644 index 00000000000000..24ce941ca70670 --- /dev/null +++ b/app/models/bulk_imports/entity_pipeline_status.rb @@ -0,0 +1,40 @@ +# frozen_string_literal: true + +module BulkImports + class EntityPipelineStatus < ApplicationRecord + self.table_name = 'bulk_import_entity_pipeline_statuses' + + belongs_to :entity, + foreign_key: :bulk_import_entity_id, + class_name: 'BulkImports::Entity', + optional: false + + validates :stage_order, presence: true + validates :pipeline_name, presence: true + + state_machine :status, initial: :created do + state :created, value: 0 + state :started, value: 1 + state :finished, value: 2 + state :failed, value: -1 + state :skipped, value: -2 + + event :start do + transition created: :started + end + + event :finish do + transition started: :finished + transition failed: :failed + end + + event :skip do + transition any => :skipped + end + + event :fail_op do + transition any => :failed + end + end + end +end diff --git a/app/workers/all_queues.yml b/app/workers/all_queues.yml index bded69e7f47cfa..8083ab572ae41e 100644 --- a/app/workers/all_queues.yml +++ b/app/workers/all_queues.yml @@ -1469,6 +1469,22 @@ :weight: 1 :idempotent: :tags: [] +- :name: bulk_imports_pipeline + :feature_category: :importers + :has_external_dependencies: true + :urgency: :low + :resource_boundary: :unknown + :weight: 1 + :idempotent: + :tags: [] +- :name: bulk_imports_stage + :feature_category: :importers + :has_external_dependencies: true + :urgency: :low + :resource_boundary: :unknown + :weight: 1 + :idempotent: + :tags: [] - :name: chat_notification :feature_category: :chatops :has_external_dependencies: true diff --git a/app/workers/bulk_imports/pipeline_worker.rb b/app/workers/bulk_imports/pipeline_worker.rb new file mode 100644 index 00000000000000..c83c4a6e043dfb --- /dev/null +++ b/app/workers/bulk_imports/pipeline_worker.rb @@ -0,0 +1,64 @@ +# frozen_string_literal: true + +module BulkImports + class PipelineWorker + include ApplicationWorker + + feature_category :importers + + sidekiq_options retry: false, dead: false + + worker_has_external_dependencies! + + def perform(pipeline_name, entity_id) + entity = ::BulkImports::Entity.find(entity_id) + + pipeline_status = entity + .pipeline_statuses + .with_status(:created) + .find_by(pipeline_name: pipeline_name) + + logger.info(log_params( + entity_id: entity_id, + pipeline_name: pipeline_name + )) + + if pipeline_status.present? + run(pipeline_status) + else + logger.error(log_params( + entity_id: entity_id, + pipeline_name: pipeline_name, + message: 'Pipeline does not exist' + )) + end + end + + def run(pipeline_status) + pipeline_status.start! + + context = ::BulkImports::Pipeline::Context.new(pipeline_status.entity) + pipeline_class = pipeline_status.pipeline_name.constantize + + pipeline_class.new(context, pipeline_status).run + + pipeline_status.finish! + rescue => e + pipeline_status.fail_op! + + logger.error(log_params( + entity_id: pipeline_status.entity.id, + pipeline_name: pipeline_status.name, + message: e.message + )) + end + + def log_params(extra = {}) + { worker: self.class.name }.merge(extra) + end + + def logger + @logger ||= Gitlab::Import::Logger.build + end + end +end diff --git a/app/workers/bulk_imports/stage_worker.rb b/app/workers/bulk_imports/stage_worker.rb new file mode 100644 index 00000000000000..20ee72b1dd785f --- /dev/null +++ b/app/workers/bulk_imports/stage_worker.rb @@ -0,0 +1,63 @@ +# frozen_string_literal: true + +module BulkImports + class StageWorker + include ApplicationWorker + + feature_category :importers + + sidekiq_options retry: false, dead: false + + worker_has_external_dependencies! + + TIMEOUT = 3.seconds + + def perform(entity_id, current_stage = nil) + logger.info( + worker: self.class.name, + entity_id: entity_id, + current_stage: current_stage + ) + + entity = BulkImports::Entity.find(entity_id) + + if current_stage && stage_running?(entity, current_stage) + self.class.perform_in(TIMEOUT, entity_id, current_stage) + else + # TODO: It might be better to have separate tables for stages and pipelines + # This would improve this query, since we would be able to fetch the + # stages to be processed before fetching the pipelines + stages = entity + .pipeline_statuses + .with_status(:created) + .order(:stage_order) + .group_by(&:stage_order) + + return if stages.blank? + + stage, pipelines = stages.first + pipelines.each do |pipeline| + BulkImports::PipelineWorker.perform_async(pipeline.pipeline_name, entity_id) + end + + self.class.perform_in(TIMEOUT, entity_id, stage) + end + rescue => e + logger.error(worker: self.class.name, message: e.message) + + raise e + end + + def stage_running?(entity, stage) + entity + .pipeline_statuses + .where(stage_order: stage) + .with_statuses(:created, :started) + .exists? + end + + def logger + @logger ||= Gitlab::Import::Logger + end + end +end diff --git a/config/sidekiq_queues.yml b/config/sidekiq_queues.yml index e7485540dc4f71..ee3f64874d36b2 100644 --- a/config/sidekiq_queues.yml +++ b/config/sidekiq_queues.yml @@ -54,6 +54,10 @@ - 1 - - bulk_imports_entity - 1 +- - bulk_imports_pipeline + - 1 +- - bulk_imports_stage + - 1 - - chaos - 2 - - chat_notification diff --git a/db/migrate/20210223141620_create_bulk_import_entity_pipelines.rb b/db/migrate/20210223141620_create_bulk_import_entity_pipelines.rb new file mode 100644 index 00000000000000..324ac957aa2d96 --- /dev/null +++ b/db/migrate/20210223141620_create_bulk_import_entity_pipelines.rb @@ -0,0 +1,40 @@ +# frozen_string_literal: true + +class CreateBulkImportEntityPipelines < ActiveRecord::Migration[6.0] + include Gitlab::Database::MigrationHelpers + + DOWNTIME = false + + disable_ddl_transaction! + + def up + with_lock_retries do + unless table_exists?(:bulk_import_entity_pipeline_statuses) + create_table :bulk_import_entity_pipeline_statuses do |t| + t.references :bulk_import_entity, + index: false, + null: false, + foreign_key: { on_delete: :cascade } + + t.text :pipeline_name + + t.integer :stage_order, null: false, limit: 2 + t.integer :status, null: false, limit: 2 + + t.text :jid + + t.timestamps_with_timezone + end + end + end + + add_text_limit(:bulk_import_entity_pipeline_statuses, :pipeline_name, 255) + add_text_limit(:bulk_import_entity_pipeline_statuses, :jid, 255) + end + + def down + with_lock_retries do + drop_table :bulk_import_entity_pipeline_statuses + end + end +end diff --git a/db/schema_migrations/20210223141620 b/db/schema_migrations/20210223141620 new file mode 100644 index 00000000000000..67536a1d5caa47 --- /dev/null +++ b/db/schema_migrations/20210223141620 @@ -0,0 +1 @@ +9881e1236f4fa3b9944beba5ffbcbb4e5d8ad5144582e98f120ef722145f4534 \ No newline at end of file diff --git a/db/structure.sql b/db/structure.sql index 08d79ddf007863..e08075891c22c7 100644 --- a/db/structure.sql +++ b/db/structure.sql @@ -10057,6 +10057,28 @@ CREATE SEQUENCE bulk_import_entities_id_seq ALTER SEQUENCE bulk_import_entities_id_seq OWNED BY bulk_import_entities.id; +CREATE TABLE bulk_import_entity_pipeline_statuses ( + id bigint NOT NULL, + bulk_import_entity_id bigint NOT NULL, + pipeline_name text, + stage_order smallint NOT NULL, + status smallint NOT NULL, + jid text, + created_at timestamp with time zone NOT NULL, + updated_at timestamp with time zone NOT NULL, + CONSTRAINT check_aa7dc757dd CHECK ((char_length(pipeline_name) <= 255)), + CONSTRAINT check_b6f308cfff CHECK ((char_length(jid) <= 255)) +); + +CREATE SEQUENCE bulk_import_entity_pipeline_statuses_id_seq + START WITH 1 + INCREMENT BY 1 + NO MINVALUE + NO MAXVALUE + CACHE 1; + +ALTER SEQUENCE bulk_import_entity_pipeline_statuses_id_seq OWNED BY bulk_import_entity_pipeline_statuses.id; + CREATE TABLE bulk_import_failures ( id bigint NOT NULL, bulk_import_entity_id bigint NOT NULL, @@ -18822,6 +18844,8 @@ ALTER TABLE ONLY bulk_import_configurations ALTER COLUMN id SET DEFAULT nextval( ALTER TABLE ONLY bulk_import_entities ALTER COLUMN id SET DEFAULT nextval('bulk_import_entities_id_seq'::regclass); +ALTER TABLE ONLY bulk_import_entity_pipeline_statuses ALTER COLUMN id SET DEFAULT nextval('bulk_import_entity_pipeline_statuses_id_seq'::regclass); + ALTER TABLE ONLY bulk_import_failures ALTER COLUMN id SET DEFAULT nextval('bulk_import_failures_id_seq'::regclass); ALTER TABLE ONLY bulk_import_trackers ALTER COLUMN id SET DEFAULT nextval('bulk_import_trackers_id_seq'::regclass); @@ -19916,6 +19940,9 @@ ALTER TABLE ONLY bulk_import_configurations ALTER TABLE ONLY bulk_import_entities ADD CONSTRAINT bulk_import_entities_pkey PRIMARY KEY (id); +ALTER TABLE ONLY bulk_import_entity_pipeline_statuses + ADD CONSTRAINT bulk_import_entity_pipeline_statuses_pkey PRIMARY KEY (id); + ALTER TABLE ONLY bulk_import_failures ADD CONSTRAINT bulk_import_failures_pkey PRIMARY KEY (id); @@ -25915,6 +25942,9 @@ ALTER TABLE ONLY repository_languages ALTER TABLE ONLY dependency_proxy_manifests ADD CONSTRAINT fk_rails_a758021fb0 FOREIGN KEY (group_id) REFERENCES namespaces(id) ON DELETE CASCADE; +ALTER TABLE ONLY bulk_import_entity_pipeline_statuses + ADD CONSTRAINT fk_rails_a778dd94b5 FOREIGN KEY (bulk_import_entity_id) REFERENCES bulk_import_entities(id) ON DELETE CASCADE; + ALTER TABLE ONLY resource_milestone_events ADD CONSTRAINT fk_rails_a788026e85 FOREIGN KEY (issue_id) REFERENCES issues(id) ON DELETE CASCADE; diff --git a/ee/lib/ee/bulk_imports/groups/pipelines/epic_award_emoji_pipeline.rb b/ee/lib/ee/bulk_imports/groups/pipelines/epic_award_emoji_pipeline.rb index efc4e8f2e3c235..50777be5ac9050 100644 --- a/ee/lib/ee/bulk_imports/groups/pipelines/epic_award_emoji_pipeline.rb +++ b/ee/lib/ee/bulk_imports/groups/pipelines/epic_award_emoji_pipeline.rb @@ -16,8 +16,9 @@ class EpicAwardEmojiPipeline loader EE::BulkImports::Groups::Loaders::EpicAwardEmojiLoader # rubocop: disable CodeReuse/ActiveRecord - def initialize(context) + def initialize(context, pipeline_status) @context = context + @pipeline_status = pipeline_status @group = context.group @epic_iids = @group.epics.order(iid: :desc).pluck(:iid) diff --git a/ee/lib/ee/bulk_imports/groups/pipelines/epic_events_pipeline.rb b/ee/lib/ee/bulk_imports/groups/pipelines/epic_events_pipeline.rb index e0d558c78cd616..61bd98f8ce459e 100644 --- a/ee/lib/ee/bulk_imports/groups/pipelines/epic_events_pipeline.rb +++ b/ee/lib/ee/bulk_imports/groups/pipelines/epic_events_pipeline.rb @@ -8,13 +8,14 @@ class EpicEventsPipeline include ::BulkImports::Pipeline extractor ::BulkImports::Common::Extractors::GraphqlExtractor, - query: EE::BulkImports::Groups::Graphql::GetEpicEventsQuery + query: EE::BulkImports::Groups::Graphql::GetEpicEventsQuery transformer ::BulkImports::Common::Transformers::ProhibitedAttributesTransformer transformer ::BulkImports::Common::Transformers::UserReferenceTransformer, reference: 'author' - def initialize(context) + def initialize(context, pipeline_status) @context = context + @pipeline_status = pipeline_status @group = context.group @epic_iids = @group.epics.order(iid: :desc).pluck(:iid) # rubocop: disable CodeReuse/ActiveRecord diff --git a/ee/lib/ee/bulk_imports/importers/group_importer.rb b/ee/lib/ee/bulk_imports/importers/group_importer.rb index a4e04acda4a653..111b6d2cfef73f 100644 --- a/ee/lib/ee/bulk_imports/importers/group_importer.rb +++ b/ee/lib/ee/bulk_imports/importers/group_importer.rb @@ -8,12 +8,14 @@ module GroupImporter private - override :pipelines - def pipelines + override :stages_list + def stages_list super + [ EE::BulkImports::Groups::Pipelines::EpicsPipeline, - EE::BulkImports::Groups::Pipelines::EpicAwardEmojiPipeline, - EE::BulkImports::Groups::Pipelines::EpicEventsPipeline + [ + EE::BulkImports::Groups::Pipelines::EpicAwardEmojiPipeline, + EE::BulkImports::Groups::Pipelines::EpicEventsPipeline + ] ] end end diff --git a/lib/bulk_imports/groups/pipelines/entity_finisher.rb b/lib/bulk_imports/groups/pipelines/entity_finisher.rb new file mode 100644 index 00000000000000..06e7011173bca3 --- /dev/null +++ b/lib/bulk_imports/groups/pipelines/entity_finisher.rb @@ -0,0 +1,28 @@ +# frozen_string_literal: true + +module BulkImports + module Groups + module Pipelines + class EntityFinisher + def initialize(context, pipeline_status) + @context = context + @pipeline_status = pipeline_status + end + + def run + context.entity.finish! + + logger.info(entity_id: context.entity.id, message: 'Entity Finished') + end + + private + + attr_reader :context, :pipeline_status + + def logger + @logger ||= Gitlab::Import::Logger.build + end + end + end + end +end diff --git a/lib/bulk_imports/importers/group_importer.rb b/lib/bulk_imports/importers/group_importer.rb index f967b7ad7ab7c9..93e35a689a4e75 100644 --- a/lib/bulk_imports/importers/group_importer.rb +++ b/lib/bulk_imports/importers/group_importer.rb @@ -8,25 +8,51 @@ def initialize(entity) end def execute - context = BulkImports::Pipeline::Context.new(entity) - - pipelines.each { |pipeline| pipeline.new(context).run } - - entity.finish! + logger.info(importer: self.class.name, message: 'start', entity_id: entity.id) + + stages = stages_list + + # Store all the pipelines to run in the database + # They have a order column, which indicates the order they'll run + # Pipelines of the same entity in the same order number will run in parallel + stages.each.with_index do |pipelines, stage_order| + Array.wrap(pipelines).each do |pipeline| + entity.pipeline_statuses.create!( + pipeline_name: pipeline.name, + stage_order: stage_order + ) + end + end + + # Ensure to be the last Pipeline to run + entity.pipeline_statuses.create!( + pipeline_name: BulkImports::Groups::Pipelines::EntityFinisher, + stage_order: stages.length + ) + + BulkImports::StageWorker.perform_async(entity.id) end private attr_reader :entity - def pipelines + # List of pipelines to run + # Pipelines grouped together in a sub-array will run in parallel + def stages_list [ BulkImports::Groups::Pipelines::GroupPipeline, - BulkImports::Groups::Pipelines::SubgroupEntitiesPipeline, - BulkImports::Groups::Pipelines::MembersPipeline, - BulkImports::Groups::Pipelines::LabelsPipeline + [ + BulkImports::Groups::Pipelines::SubgroupEntitiesPipeline, + BulkImports::Groups::Pipelines::MembersPipeline, + BulkImports::Groups::Pipelines::LabelsPipeline + ] ] end + + def logger + @logger ||= Gitlab::Import::Logger.build + end end end end diff --git a/lib/bulk_imports/pipeline.rb b/lib/bulk_imports/pipeline.rb index f8e5edc27d835a..60613ea3a5e522 100644 --- a/lib/bulk_imports/pipeline.rb +++ b/lib/bulk_imports/pipeline.rb @@ -7,14 +7,15 @@ module Pipeline include Gitlab::ClassAttributes include Runner - def initialize(context) + def initialize(context, pipeline_status) @context = context + @pipeline_status = pipeline_status end included do private - attr_reader :context + attr_reader :context, :pipeline_status # Fetch pipeline extractor. # An extractor is defined either by instance `#extract(context)` method diff --git a/lib/bulk_imports/pipeline/runner.rb b/lib/bulk_imports/pipeline/runner.rb index d39f4121b51b66..1c3784f5e8f201 100644 --- a/lib/bulk_imports/pipeline/runner.rb +++ b/lib/bulk_imports/pipeline/runner.rb @@ -64,6 +64,7 @@ def extracted_data_from def mark_as_failed warn(message: 'Pipeline failed', pipeline_class: pipeline) + pipeline_status.fail_op! context.entity.fail_op! end @@ -79,6 +80,8 @@ def log_skip(extra = {}) pipeline_class: pipeline }.merge(extra) + pipeline_status.skip! + info(log) end -- GitLab From 5cc9d83917004b3b2d2d919862c956ecc8cece18 Mon Sep 17 00:00:00 2001 From: Kassio Borges Date: Thu, 25 Feb 2021 09:11:00 +0000 Subject: [PATCH 2/6] Better way to fetch next stage pipelines --- app/models/bulk_imports/entity.rb | 6 ++++++ app/workers/bulk_imports/stage_worker.rb | 18 +++++------------- 2 files changed, 11 insertions(+), 13 deletions(-) diff --git a/app/models/bulk_imports/entity.rb b/app/models/bulk_imports/entity.rb index cf49e36d8b1979..2617d5440361bb 100644 --- a/app/models/bulk_imports/entity.rb +++ b/app/models/bulk_imports/entity.rb @@ -91,6 +91,12 @@ def next_page_for(relation) trackers.find_by(relation: relation)&.next_page end + def next_created_stage + pipeline_statuses.where( + stage_order: pipeline_statuses.select('MIN(stage_order)').with_status(:created) + ) + end + private def validate_parent_is_a_group diff --git a/app/workers/bulk_imports/stage_worker.rb b/app/workers/bulk_imports/stage_worker.rb index 20ee72b1dd785f..7316152197f691 100644 --- a/app/workers/bulk_imports/stage_worker.rb +++ b/app/workers/bulk_imports/stage_worker.rb @@ -24,23 +24,15 @@ def perform(entity_id, current_stage = nil) if current_stage && stage_running?(entity, current_stage) self.class.perform_in(TIMEOUT, entity_id, current_stage) else - # TODO: It might be better to have separate tables for stages and pipelines - # This would improve this query, since we would be able to fetch the - # stages to be processed before fetching the pipelines - stages = entity - .pipeline_statuses - .with_status(:created) - .order(:stage_order) - .group_by(&:stage_order) - - return if stages.blank? - - stage, pipelines = stages.first + pipelines = entity.next_created_stage + + return if pipelines.blank? + pipelines.each do |pipeline| BulkImports::PipelineWorker.perform_async(pipeline.pipeline_name, entity_id) end - self.class.perform_in(TIMEOUT, entity_id, stage) + self.class.perform_in(TIMEOUT, entity_id, pipelines.first.stage_order) end rescue => e logger.error(worker: self.class.name, message: e.message) -- GitLab From 0cc2f4c4538d4c70af26f86d101ed0d18af407b0 Mon Sep 17 00:00:00 2001 From: Kassio Borges Date: Thu, 25 Feb 2021 11:11:12 +0000 Subject: [PATCH 3/6] Use bulk_import_trackers --- app/models/bulk_imports/entity.rb | 34 +++++----------- .../bulk_imports/entity_pipeline_status.rb | 40 ------------------- app/models/bulk_imports/tracker.rb | 32 +++++++++++++-- app/workers/bulk_imports/pipeline_worker.rb | 39 ++++++++---------- app/workers/bulk_imports/stage_worker.rb | 26 +++++------- ...eline_and_stage_to_bulk_import_trackers.rb | 36 +++++++++++++++++ ...620_create_bulk_import_entity_pipelines.rb | 40 ------------------- db/structure.sql | 36 +++-------------- .../graphql/get_epic_award_emoji_query.rb | 3 +- .../groups/graphql/get_epic_events_query.rb | 3 +- .../groups/graphql/get_epics_query.rb | 2 +- .../pipelines/epic_award_emoji_pipeline.rb | 10 ++--- .../groups/pipelines/epic_events_pipeline.rb | 10 ++--- .../groups/pipelines/epics_pipeline.rb | 3 +- .../bulk_imports/importers/group_importer.rb | 4 +- .../groups/graphql/get_labels_query.rb | 2 +- .../groups/graphql/get_members_query.rb | 2 +- .../groups/pipelines/entity_finisher.rb | 5 +-- .../groups/pipelines/labels_pipeline.rb | 3 +- .../groups/pipelines/members_pipeline.rb | 3 +- lib/bulk_imports/importers/group_importer.rb | 29 +++++++++----- lib/bulk_imports/pipeline.rb | 6 +-- lib/bulk_imports/pipeline/context.rb | 15 +++++-- lib/bulk_imports/pipeline/runner.rb | 6 ++- 24 files changed, 162 insertions(+), 227 deletions(-) delete mode 100644 app/models/bulk_imports/entity_pipeline_status.rb create mode 100644 db/migrate/20210223141620_add_pipeline_and_stage_to_bulk_import_trackers.rb delete mode 100644 db/migrate/20210223141620_create_bulk_import_entity_pipelines.rb diff --git a/app/models/bulk_imports/entity.rb b/app/models/bulk_imports/entity.rb index 2617d5440361bb..30aa50ae2dd174 100644 --- a/app/models/bulk_imports/entity.rb +++ b/app/models/bulk_imports/entity.rb @@ -35,10 +35,6 @@ class BulkImports::Entity < ApplicationRecord inverse_of: :entity, foreign_key: :bulk_import_entity_id - has_many :pipeline_statuses, - class_name: 'BulkImports::EntityPipelineStatus', - foreign_key: :bulk_import_entity_id - validates :project, absence: true, if: :group validates :group, absence: true, if: :project validates :source_type, :source_full_path, :destination_name, presence: true @@ -72,29 +68,17 @@ class BulkImports::Entity < ApplicationRecord end end - def update_tracker_for(relation:, has_next_page:, next_page: nil) - attributes = { - relation: relation, - has_next_page: has_next_page, - next_page: next_page, - bulk_import_entity_id: id - } - - trackers.upsert(attributes, unique_by: %i[bulk_import_entity_id relation]) - end - - def has_next_page?(relation) - trackers.find_by(relation: relation)&.has_next_page - end - - def next_page_for(relation) - trackers.find_by(relation: relation)&.next_page + def next_stage_pipelines + trackers.where( + stage: trackers.select('MIN(stage)').with_status(:created) + ) end - def next_created_stage - pipeline_statuses.where( - stage_order: pipeline_statuses.select('MIN(stage_order)').with_status(:created) - ) + def stage_running?(stage) + trackers + .where(stage: stage) + .with_statuses(:created, :started) + .exists? end private diff --git a/app/models/bulk_imports/entity_pipeline_status.rb b/app/models/bulk_imports/entity_pipeline_status.rb deleted file mode 100644 index 24ce941ca70670..00000000000000 --- a/app/models/bulk_imports/entity_pipeline_status.rb +++ /dev/null @@ -1,40 +0,0 @@ -# frozen_string_literal: true - -module BulkImports - class EntityPipelineStatus < ApplicationRecord - self.table_name = 'bulk_import_entity_pipeline_statuses' - - belongs_to :entity, - foreign_key: :bulk_import_entity_id, - class_name: 'BulkImports::Entity', - optional: false - - validates :stage_order, presence: true - validates :pipeline_name, presence: true - - state_machine :status, initial: :created do - state :created, value: 0 - state :started, value: 1 - state :finished, value: 2 - state :failed, value: -1 - state :skipped, value: -2 - - event :start do - transition created: :started - end - - event :finish do - transition started: :finished - transition failed: :failed - end - - event :skip do - transition any => :skipped - end - - event :fail_op do - transition any => :failed - end - end - end -end diff --git a/app/models/bulk_imports/tracker.rb b/app/models/bulk_imports/tracker.rb index 02e0904e1afd0f..a01e3b8d0d832e 100644 --- a/app/models/bulk_imports/tracker.rb +++ b/app/models/bulk_imports/tracker.rb @@ -10,9 +10,33 @@ class BulkImports::Tracker < ApplicationRecord foreign_key: :bulk_import_entity_id, optional: false - validates :relation, - presence: true, - uniqueness: { scope: :bulk_import_entity_id } - validates :next_page, presence: { if: :has_next_page? } + + validates :stage, presence: true + validates :pipeline_name, presence: true + + state_machine :status, initial: :created do + state :created, value: 0 + state :started, value: 1 + state :finished, value: 2 + state :failed, value: -1 + state :skipped, value: -2 + + event :start do + transition created: :started + end + + event :finish do + transition started: :finished + transition failed: :failed + end + + event :skip do + transition any => :skipped + end + + event :fail_op do + transition any => :failed + end + end end diff --git a/app/workers/bulk_imports/pipeline_worker.rb b/app/workers/bulk_imports/pipeline_worker.rb index c83c4a6e043dfb..84b265f1ee7d42 100644 --- a/app/workers/bulk_imports/pipeline_worker.rb +++ b/app/workers/bulk_imports/pipeline_worker.rb @@ -10,21 +10,16 @@ class PipelineWorker worker_has_external_dependencies! - def perform(pipeline_name, entity_id) - entity = ::BulkImports::Entity.find(entity_id) + def perform(tracker_id) + tracker = ::BulkImports::Tracker.with_status(:created).find(tracker_id) - pipeline_status = entity - .pipeline_statuses - .with_status(:created) - .find_by(pipeline_name: pipeline_name) - - logger.info(log_params( - entity_id: entity_id, - pipeline_name: pipeline_name - )) + if tracker.present? + logger.info(log_params( + entity_id: tracker.entity.id, + pipeline_name: tracker.pipeline_name + )) - if pipeline_status.present? - run(pipeline_status) + run(tracker) else logger.error(log_params( entity_id: entity_id, @@ -34,21 +29,21 @@ def perform(pipeline_name, entity_id) end end - def run(pipeline_status) - pipeline_status.start! + def run(tracker) + tracker.update!(status_event: 'start', jid: jid) - context = ::BulkImports::Pipeline::Context.new(pipeline_status.entity) - pipeline_class = pipeline_status.pipeline_name.constantize + context = ::BulkImports::Pipeline::Context.new(tracker) + pipeline_class = tracker.pipeline_name.constantize - pipeline_class.new(context, pipeline_status).run + pipeline_class.new(context).run - pipeline_status.finish! + tracker.finish! rescue => e - pipeline_status.fail_op! + tracker.fail_op! logger.error(log_params( - entity_id: pipeline_status.entity.id, - pipeline_name: pipeline_status.name, + entity_id: tracker.entity.id, + pipeline_name: tracker.pipeline_name, message: e.message )) end diff --git a/app/workers/bulk_imports/stage_worker.rb b/app/workers/bulk_imports/stage_worker.rb index 7316152197f691..4cc6538c18d358 100644 --- a/app/workers/bulk_imports/stage_worker.rb +++ b/app/workers/bulk_imports/stage_worker.rb @@ -10,7 +10,7 @@ class StageWorker worker_has_external_dependencies! - TIMEOUT = 3.seconds + TIMEOUT = 5.seconds def perform(entity_id, current_stage = nil) logger.info( @@ -21,18 +21,22 @@ def perform(entity_id, current_stage = nil) entity = BulkImports::Entity.find(entity_id) - if current_stage && stage_running?(entity, current_stage) + if current_stage && entity.stage_running?(current_stage) self.class.perform_in(TIMEOUT, entity_id, current_stage) else - pipelines = entity.next_created_stage + trackers = entity.next_stage_pipelines - return if pipelines.blank? + return if trackers.blank? - pipelines.each do |pipeline| - BulkImports::PipelineWorker.perform_async(pipeline.pipeline_name, entity_id) + trackers.each do |tracker| + BulkImports::PipelineWorker.perform_async(tracker.id) end - self.class.perform_in(TIMEOUT, entity_id, pipelines.first.stage_order) + self.class.perform_in( + TIMEOUT, + entity_id, + trackers.first.stage + ) end rescue => e logger.error(worker: self.class.name, message: e.message) @@ -40,14 +44,6 @@ def perform(entity_id, current_stage = nil) raise e end - def stage_running?(entity, stage) - entity - .pipeline_statuses - .where(stage_order: stage) - .with_statuses(:created, :started) - .exists? - end - def logger @logger ||= Gitlab::Import::Logger end diff --git a/db/migrate/20210223141620_add_pipeline_and_stage_to_bulk_import_trackers.rb b/db/migrate/20210223141620_add_pipeline_and_stage_to_bulk_import_trackers.rb new file mode 100644 index 00000000000000..b0eb6cd3d81b01 --- /dev/null +++ b/db/migrate/20210223141620_add_pipeline_and_stage_to_bulk_import_trackers.rb @@ -0,0 +1,36 @@ +# frozen_string_literal: true + +class AddPipelineAndStageToBulkImportTrackers < ActiveRecord::Migration[6.0] + include Gitlab::Database::MigrationHelpers + + DOWNTIME = false + + disable_ddl_transaction! + + def up + unless column_exists?(:bulk_import_failures, :pipeline_name, :text) + with_lock_retries do + add_column :bulk_import_trackers, :jid, :text + add_column :bulk_import_trackers, :pipeline_name, :text + add_column :bulk_import_trackers, :stage, :integer, null: false, limit: 2 + add_column :bulk_import_trackers, :status, :integer, null: false, limit: 2 + + change_column_null :bulk_import_trackers, :relation, false + end + end + + add_text_limit :bulk_import_trackers, :pipeline_name, 255 + add_text_limit :bulk_import_trackers, :jid, 255 + end + + def down + with_lock_retries do + remove_column :bulk_import_trackers, :jid, :text + remove_column :bulk_import_trackers, :pipeline_name, :text + remove_column :bulk_import_trackers, :stage, :integer + remove_column :bulk_import_trackers, :status, :integer + + change_column_null :bulk_import_trackers, :relation, true + end + end +end diff --git a/db/migrate/20210223141620_create_bulk_import_entity_pipelines.rb b/db/migrate/20210223141620_create_bulk_import_entity_pipelines.rb deleted file mode 100644 index 324ac957aa2d96..00000000000000 --- a/db/migrate/20210223141620_create_bulk_import_entity_pipelines.rb +++ /dev/null @@ -1,40 +0,0 @@ -# frozen_string_literal: true - -class CreateBulkImportEntityPipelines < ActiveRecord::Migration[6.0] - include Gitlab::Database::MigrationHelpers - - DOWNTIME = false - - disable_ddl_transaction! - - def up - with_lock_retries do - unless table_exists?(:bulk_import_entity_pipeline_statuses) - create_table :bulk_import_entity_pipeline_statuses do |t| - t.references :bulk_import_entity, - index: false, - null: false, - foreign_key: { on_delete: :cascade } - - t.text :pipeline_name - - t.integer :stage_order, null: false, limit: 2 - t.integer :status, null: false, limit: 2 - - t.text :jid - - t.timestamps_with_timezone - end - end - end - - add_text_limit(:bulk_import_entity_pipeline_statuses, :pipeline_name, 255) - add_text_limit(:bulk_import_entity_pipeline_statuses, :jid, 255) - end - - def down - with_lock_retries do - drop_table :bulk_import_entity_pipeline_statuses - end - end -end diff --git a/db/structure.sql b/db/structure.sql index e08075891c22c7..408d8c3d6f7ebe 100644 --- a/db/structure.sql +++ b/db/structure.sql @@ -10057,28 +10057,6 @@ CREATE SEQUENCE bulk_import_entities_id_seq ALTER SEQUENCE bulk_import_entities_id_seq OWNED BY bulk_import_entities.id; -CREATE TABLE bulk_import_entity_pipeline_statuses ( - id bigint NOT NULL, - bulk_import_entity_id bigint NOT NULL, - pipeline_name text, - stage_order smallint NOT NULL, - status smallint NOT NULL, - jid text, - created_at timestamp with time zone NOT NULL, - updated_at timestamp with time zone NOT NULL, - CONSTRAINT check_aa7dc757dd CHECK ((char_length(pipeline_name) <= 255)), - CONSTRAINT check_b6f308cfff CHECK ((char_length(jid) <= 255)) -); - -CREATE SEQUENCE bulk_import_entity_pipeline_statuses_id_seq - START WITH 1 - INCREMENT BY 1 - NO MINVALUE - NO MAXVALUE - CACHE 1; - -ALTER SEQUENCE bulk_import_entity_pipeline_statuses_id_seq OWNED BY bulk_import_entity_pipeline_statuses.id; - CREATE TABLE bulk_import_failures ( id bigint NOT NULL, bulk_import_entity_id bigint NOT NULL, @@ -10110,8 +10088,14 @@ CREATE TABLE bulk_import_trackers ( relation text NOT NULL, next_page text, has_next_page boolean DEFAULT false NOT NULL, + jid text, + pipeline_name text, + stage smallint NOT NULL, + status smallint NOT NULL, CONSTRAINT check_2d45cae629 CHECK ((char_length(relation) <= 255)), + CONSTRAINT check_328a4e67b0 CHECK ((char_length(pipeline_name) <= 255)), CONSTRAINT check_40aeaa600b CHECK ((char_length(next_page) <= 255)), + CONSTRAINT check_603f91cb06 CHECK ((char_length(jid) <= 255)), CONSTRAINT check_next_page_requirement CHECK (((has_next_page IS FALSE) OR (next_page IS NOT NULL))) ); @@ -18844,8 +18828,6 @@ ALTER TABLE ONLY bulk_import_configurations ALTER COLUMN id SET DEFAULT nextval( ALTER TABLE ONLY bulk_import_entities ALTER COLUMN id SET DEFAULT nextval('bulk_import_entities_id_seq'::regclass); -ALTER TABLE ONLY bulk_import_entity_pipeline_statuses ALTER COLUMN id SET DEFAULT nextval('bulk_import_entity_pipeline_statuses_id_seq'::regclass); - ALTER TABLE ONLY bulk_import_failures ALTER COLUMN id SET DEFAULT nextval('bulk_import_failures_id_seq'::regclass); ALTER TABLE ONLY bulk_import_trackers ALTER COLUMN id SET DEFAULT nextval('bulk_import_trackers_id_seq'::regclass); @@ -19940,9 +19922,6 @@ ALTER TABLE ONLY bulk_import_configurations ALTER TABLE ONLY bulk_import_entities ADD CONSTRAINT bulk_import_entities_pkey PRIMARY KEY (id); -ALTER TABLE ONLY bulk_import_entity_pipeline_statuses - ADD CONSTRAINT bulk_import_entity_pipeline_statuses_pkey PRIMARY KEY (id); - ALTER TABLE ONLY bulk_import_failures ADD CONSTRAINT bulk_import_failures_pkey PRIMARY KEY (id); @@ -25942,9 +25921,6 @@ ALTER TABLE ONLY repository_languages ALTER TABLE ONLY dependency_proxy_manifests ADD CONSTRAINT fk_rails_a758021fb0 FOREIGN KEY (group_id) REFERENCES namespaces(id) ON DELETE CASCADE; -ALTER TABLE ONLY bulk_import_entity_pipeline_statuses - ADD CONSTRAINT fk_rails_a778dd94b5 FOREIGN KEY (bulk_import_entity_id) REFERENCES bulk_import_entities(id) ON DELETE CASCADE; - ALTER TABLE ONLY resource_milestone_events ADD CONSTRAINT fk_rails_a788026e85 FOREIGN KEY (issue_id) REFERENCES issues(id) ON DELETE CASCADE; diff --git a/ee/lib/ee/bulk_imports/groups/graphql/get_epic_award_emoji_query.rb b/ee/lib/ee/bulk_imports/groups/graphql/get_epic_award_emoji_query.rb index 8aa9fb8da2f4b6..39247565c873b2 100644 --- a/ee/lib/ee/bulk_imports/groups/graphql/get_epic_award_emoji_query.rb +++ b/ee/lib/ee/bulk_imports/groups/graphql/get_epic_award_emoji_query.rb @@ -32,11 +32,10 @@ def to_s def variables(context) iid = context.extra[:epic_iid] - tracker = "epic_#{iid}_award_emoji" { full_path: context.entity.source_full_path, - cursor: context.entity.next_page_for(tracker), + cursor: context.next_page, epic_iid: iid } end diff --git a/ee/lib/ee/bulk_imports/groups/graphql/get_epic_events_query.rb b/ee/lib/ee/bulk_imports/groups/graphql/get_epic_events_query.rb index 1e93d3220eadc0..2fd36ee9257c41 100644 --- a/ee/lib/ee/bulk_imports/groups/graphql/get_epic_events_query.rb +++ b/ee/lib/ee/bulk_imports/groups/graphql/get_epic_events_query.rb @@ -34,11 +34,10 @@ def to_s def variables(context) iid = context.extra[:epic_iid] - tracker = "epic_#{iid}_events" { full_path: context.entity.source_full_path, - cursor: context.entity.next_page_for(tracker), + cursor: context.next_page, epic_iid: iid } end diff --git a/ee/lib/ee/bulk_imports/groups/graphql/get_epics_query.rb b/ee/lib/ee/bulk_imports/groups/graphql/get_epics_query.rb index b292e6f631946e..7c22e84a322190 100644 --- a/ee/lib/ee/bulk_imports/groups/graphql/get_epics_query.rb +++ b/ee/lib/ee/bulk_imports/groups/graphql/get_epics_query.rb @@ -60,7 +60,7 @@ def to_s def variables(context) { full_path: context.entity.source_full_path, - cursor: context.entity.next_page_for(:epics) + cursor: context.next_page } end diff --git a/ee/lib/ee/bulk_imports/groups/pipelines/epic_award_emoji_pipeline.rb b/ee/lib/ee/bulk_imports/groups/pipelines/epic_award_emoji_pipeline.rb index 50777be5ac9050..723f171e7b689f 100644 --- a/ee/lib/ee/bulk_imports/groups/pipelines/epic_award_emoji_pipeline.rb +++ b/ee/lib/ee/bulk_imports/groups/pipelines/epic_award_emoji_pipeline.rb @@ -16,9 +16,9 @@ class EpicAwardEmojiPipeline loader EE::BulkImports::Groups::Loaders::EpicAwardEmojiLoader # rubocop: disable CodeReuse/ActiveRecord - def initialize(context, pipeline_status) + def initialize(context) @context = context - @pipeline_status = pipeline_status + @tracker = context.tracker @group = context.group @epic_iids = @group.epics.order(iid: :desc).pluck(:iid) @@ -26,11 +26,7 @@ def initialize(context, pipeline_status) end def after_run(extracted_data) - iid = context.extra[:epic_iid] - tracker = "epic_#{iid}_award_emoji" - - context.entity.update_tracker_for( - relation: tracker, + tracker.update( has_next_page: extracted_data.has_next_page?, next_page: extracted_data.next_page ) diff --git a/ee/lib/ee/bulk_imports/groups/pipelines/epic_events_pipeline.rb b/ee/lib/ee/bulk_imports/groups/pipelines/epic_events_pipeline.rb index 61bd98f8ce459e..8e13fb23fa394e 100644 --- a/ee/lib/ee/bulk_imports/groups/pipelines/epic_events_pipeline.rb +++ b/ee/lib/ee/bulk_imports/groups/pipelines/epic_events_pipeline.rb @@ -13,9 +13,9 @@ class EpicEventsPipeline transformer ::BulkImports::Common::Transformers::ProhibitedAttributesTransformer transformer ::BulkImports::Common::Transformers::UserReferenceTransformer, reference: 'author' - def initialize(context, pipeline_status) + def initialize(context) @context = context - @pipeline_status = pipeline_status + @tracker = context.tracker @group = context.group @epic_iids = @group.epics.order(iid: :desc).pluck(:iid) # rubocop: disable CodeReuse/ActiveRecord @@ -49,11 +49,7 @@ def load(context, data) end def after_run(extracted_data) - iid = context.extra[:epic_iid] - tracker = "epic_#{iid}_events" - - context.entity.update_tracker_for( - relation: tracker, + tracker.update( has_next_page: extracted_data.has_next_page?, next_page: extracted_data.next_page ) diff --git a/ee/lib/ee/bulk_imports/groups/pipelines/epics_pipeline.rb b/ee/lib/ee/bulk_imports/groups/pipelines/epics_pipeline.rb index 6b6f1cf3e1f639..67df16495f7945 100644 --- a/ee/lib/ee/bulk_imports/groups/pipelines/epics_pipeline.rb +++ b/ee/lib/ee/bulk_imports/groups/pipelines/epics_pipeline.rb @@ -16,8 +16,7 @@ class EpicsPipeline loader EE::BulkImports::Groups::Loaders::EpicsLoader def after_run(extracted_data) - context.entity.update_tracker_for( - relation: :epics, + tracker.update( has_next_page: extracted_data.has_next_page?, next_page: extracted_data.next_page ) diff --git a/ee/lib/ee/bulk_imports/importers/group_importer.rb b/ee/lib/ee/bulk_imports/importers/group_importer.rb index 111b6d2cfef73f..f89761d3af34b5 100644 --- a/ee/lib/ee/bulk_imports/importers/group_importer.rb +++ b/ee/lib/ee/bulk_imports/importers/group_importer.rb @@ -8,8 +8,8 @@ module GroupImporter private - override :stages_list - def stages_list + override :stage_list + def stage_list super + [ EE::BulkImports::Groups::Pipelines::EpicsPipeline, [ diff --git a/lib/bulk_imports/groups/graphql/get_labels_query.rb b/lib/bulk_imports/groups/graphql/get_labels_query.rb index 23efbc33581019..bc3129de8b146e 100644 --- a/lib/bulk_imports/groups/graphql/get_labels_query.rb +++ b/lib/bulk_imports/groups/graphql/get_labels_query.rb @@ -31,7 +31,7 @@ def to_s def variables(context) { full_path: context.entity.source_full_path, - cursor: context.entity.next_page_for(:labels) + cursor: context.next_page } end diff --git a/lib/bulk_imports/groups/graphql/get_members_query.rb b/lib/bulk_imports/groups/graphql/get_members_query.rb index e3a78124a47cb1..65397ffd75a039 100644 --- a/lib/bulk_imports/groups/graphql/get_members_query.rb +++ b/lib/bulk_imports/groups/graphql/get_members_query.rb @@ -34,7 +34,7 @@ def to_s def variables(context) { full_path: context.entity.source_full_path, - cursor: context.entity.next_page_for(:group_members) + cursor: context.next_page } end diff --git a/lib/bulk_imports/groups/pipelines/entity_finisher.rb b/lib/bulk_imports/groups/pipelines/entity_finisher.rb index 06e7011173bca3..aca537cc7b51ba 100644 --- a/lib/bulk_imports/groups/pipelines/entity_finisher.rb +++ b/lib/bulk_imports/groups/pipelines/entity_finisher.rb @@ -4,9 +4,8 @@ module BulkImports module Groups module Pipelines class EntityFinisher - def initialize(context, pipeline_status) + def initialize(context) @context = context - @pipeline_status = pipeline_status end def run @@ -17,7 +16,7 @@ def run private - attr_reader :context, :pipeline_status + attr_reader :context def logger @logger ||= Gitlab::Import::Logger.build diff --git a/lib/bulk_imports/groups/pipelines/labels_pipeline.rb b/lib/bulk_imports/groups/pipelines/labels_pipeline.rb index 9f8b86827510ac..61d3e6c700e949 100644 --- a/lib/bulk_imports/groups/pipelines/labels_pipeline.rb +++ b/lib/bulk_imports/groups/pipelines/labels_pipeline.rb @@ -16,8 +16,7 @@ def load(context, data) end def after_run(extracted_data) - context.entity.update_tracker_for( - relation: :labels, + tracker.update( has_next_page: extracted_data.has_next_page?, next_page: extracted_data.next_page ) diff --git a/lib/bulk_imports/groups/pipelines/members_pipeline.rb b/lib/bulk_imports/groups/pipelines/members_pipeline.rb index 32fc931e8c33a2..d29bd74c5ae01e 100644 --- a/lib/bulk_imports/groups/pipelines/members_pipeline.rb +++ b/lib/bulk_imports/groups/pipelines/members_pipeline.rb @@ -19,8 +19,7 @@ def load(context, data) end def after_run(extracted_data) - context.entity.update_tracker_for( - relation: :group_members, + tracker.update( has_next_page: extracted_data.has_next_page?, next_page: extracted_data.next_page ) diff --git a/lib/bulk_imports/importers/group_importer.rb b/lib/bulk_imports/importers/group_importer.rb index 93e35a689a4e75..d358e65d05e88c 100644 --- a/lib/bulk_imports/importers/group_importer.rb +++ b/lib/bulk_imports/importers/group_importer.rb @@ -8,26 +8,29 @@ def initialize(entity) end def execute - logger.info(importer: self.class.name, message: 'start', entity_id: entity.id) - - stages = stages_list + logger.info( + importer: self.class.name, + message: 'start', + entity_id: entity.id + ) # Store all the pipelines to run in the database # They have a order column, which indicates the order they'll run - # Pipelines of the same entity in the same order number will run in parallel - stages.each.with_index do |pipelines, stage_order| + # Pipelines of the same entity in the same order number will run + # in parallel + stages.each.with_index do |pipelines, stage| Array.wrap(pipelines).each do |pipeline| - entity.pipeline_statuses.create!( + entity.trackers.create!( pipeline_name: pipeline.name, - stage_order: stage_order + stage: stage ) end end # Ensure to be the last Pipeline to run - entity.pipeline_statuses.create!( - pipeline_name: BulkImports::Groups::Pipelines::EntityFinisher, - stage_order: stages.length + entity.trackers.create!( + pipeline_name: BulkImports::Groups::Pipelines::EntityFinisher.name, + stage: stages.length ) BulkImports::StageWorker.perform_async(entity.id) @@ -37,9 +40,13 @@ def execute attr_reader :entity + def stages + @stages ||= stage_list + end + # List of pipelines to run # Pipelines grouped together in a sub-array will run in parallel - def stages_list + def stage_list [ BulkImports::Groups::Pipelines::GroupPipeline, [ diff --git a/lib/bulk_imports/pipeline.rb b/lib/bulk_imports/pipeline.rb index 60613ea3a5e522..302119d3cc586a 100644 --- a/lib/bulk_imports/pipeline.rb +++ b/lib/bulk_imports/pipeline.rb @@ -7,15 +7,15 @@ module Pipeline include Gitlab::ClassAttributes include Runner - def initialize(context, pipeline_status) + def initialize(context) @context = context - @pipeline_status = pipeline_status + @tracker = context.tracker end included do private - attr_reader :context, :pipeline_status + attr_reader :context, :tracker # Fetch pipeline extractor. # An extractor is defined either by instance `#extract(context)` method diff --git a/lib/bulk_imports/pipeline/context.rb b/lib/bulk_imports/pipeline/context.rb index dd121b2dbedcca..8fdc32ced4da9f 100644 --- a/lib/bulk_imports/pipeline/context.rb +++ b/lib/bulk_imports/pipeline/context.rb @@ -3,11 +3,12 @@ module BulkImports module Pipeline class Context - attr_reader :entity, :bulk_import + attr_reader :tracker, :entity, :bulk_import attr_accessor :extra - def initialize(entity, extra = {}) - @entity = entity + def initialize(tracker, extra = {}) + @tracker = tracker + @entity = tracker.entity @bulk_import = entity.bulk_import @extra = extra end @@ -23,6 +24,14 @@ def current_user def configuration bulk_import.configuration end + + def has_next_page? + tracker.has_next_page? + end + + def next_page + tracker.next_page + end end end end diff --git a/lib/bulk_imports/pipeline/runner.rb b/lib/bulk_imports/pipeline/runner.rb index 1c3784f5e8f201..e13379d7637976 100644 --- a/lib/bulk_imports/pipeline/runner.rb +++ b/lib/bulk_imports/pipeline/runner.rb @@ -64,7 +64,7 @@ def extracted_data_from def mark_as_failed warn(message: 'Pipeline failed', pipeline_class: pipeline) - pipeline_status.fail_op! + tracker.fail_op! context.entity.fail_op! end @@ -80,7 +80,7 @@ def log_skip(extra = {}) pipeline_class: pipeline }.merge(extra) - pipeline_status.skip! + tracker.skip! info(log) end @@ -95,6 +95,8 @@ def log_import_failure(exception, step) correlation_id_value: Labkit::Correlation::CorrelationId.current_or_new_id } + logger.error(log_params(attributes)) + BulkImports::Failure.create(attributes) end -- GitLab From 3d494d50da4776dc398fc6f5bc2760a39d77021a Mon Sep 17 00:00:00 2001 From: Kassio Borges Date: Thu, 25 Feb 2021 18:40:15 +0000 Subject: [PATCH 4/6] Avoid pooling on StageWorker --- app/workers/bulk_imports/pipeline_worker.rb | 2 ++ app/workers/bulk_imports/stage_worker.rb | 10 ---------- 2 files changed, 2 insertions(+), 10 deletions(-) diff --git a/app/workers/bulk_imports/pipeline_worker.rb b/app/workers/bulk_imports/pipeline_worker.rb index 84b265f1ee7d42..afde96ae65aaf1 100644 --- a/app/workers/bulk_imports/pipeline_worker.rb +++ b/app/workers/bulk_imports/pipeline_worker.rb @@ -27,6 +27,8 @@ def perform(tracker_id) message: 'Pipeline does not exist' )) end + ensure + ::BulkImports::StageWorker.perform_async(tracker.entity.id, tracker.stage) end def run(tracker) diff --git a/app/workers/bulk_imports/stage_worker.rb b/app/workers/bulk_imports/stage_worker.rb index 4cc6538c18d358..4d7fa0624b450c 100644 --- a/app/workers/bulk_imports/stage_worker.rb +++ b/app/workers/bulk_imports/stage_worker.rb @@ -8,10 +8,6 @@ class StageWorker sidekiq_options retry: false, dead: false - worker_has_external_dependencies! - - TIMEOUT = 5.seconds - def perform(entity_id, current_stage = nil) logger.info( worker: self.class.name, @@ -31,12 +27,6 @@ def perform(entity_id, current_stage = nil) trackers.each do |tracker| BulkImports::PipelineWorker.perform_async(tracker.id) end - - self.class.perform_in( - TIMEOUT, - entity_id, - trackers.first.stage - ) end rescue => e logger.error(worker: self.class.name, message: e.message) -- GitLab From 87ea424d229c7c799df6190eacad6d21a8c8a742 Mon Sep 17 00:00:00 2001 From: Kassio Borges Date: Tue, 2 Mar 2021 01:25:19 +0000 Subject: [PATCH 5/6] Remove unnecessary worker Also, rename Groups::Importer => Groups::Stages for clarity --- app/models/bulk_imports/entity.rb | 13 ---- app/models/bulk_imports/tracker.rb | 13 ++++ app/services/bulk_import_service.rb | 15 +++-- app/workers/bulk_import_worker.rb | 4 +- app/workers/bulk_imports/entity_worker.rb | 38 +++++++---- app/workers/bulk_imports/pipeline_worker.rb | 4 +- app/workers/bulk_imports/stage_worker.rb | 41 ------------ .../group_importer.rb => groups/stages.rb} | 4 +- .../groups/pipelines/entity_finisher.rb | 3 +- .../pipelines/subgroup_entities_pipeline.rb | 5 +- lib/bulk_imports/groups/stages.rb | 42 ++++++++++++ lib/bulk_imports/importers/group_importer.rb | 67 ------------------- lib/bulk_imports/pipeline/runner.rb | 2 +- 13 files changed, 104 insertions(+), 147 deletions(-) delete mode 100644 app/workers/bulk_imports/stage_worker.rb rename ee/lib/ee/bulk_imports/{importers/group_importer.rb => groups/stages.rb} (90%) create mode 100644 lib/bulk_imports/groups/stages.rb delete mode 100644 lib/bulk_imports/importers/group_importer.rb diff --git a/app/models/bulk_imports/entity.rb b/app/models/bulk_imports/entity.rb index 30aa50ae2dd174..089eeb7cdfaa20 100644 --- a/app/models/bulk_imports/entity.rb +++ b/app/models/bulk_imports/entity.rb @@ -68,19 +68,6 @@ class BulkImports::Entity < ApplicationRecord end end - def next_stage_pipelines - trackers.where( - stage: trackers.select('MIN(stage)').with_status(:created) - ) - end - - def stage_running?(stage) - trackers - .where(stage: stage) - .with_statuses(:created, :started) - .exists? - end - private def validate_parent_is_a_group diff --git a/app/models/bulk_imports/tracker.rb b/app/models/bulk_imports/tracker.rb index a01e3b8d0d832e..546bd0dc0fe6e4 100644 --- a/app/models/bulk_imports/tracker.rb +++ b/app/models/bulk_imports/tracker.rb @@ -15,6 +15,19 @@ class BulkImports::Tracker < ApplicationRecord validates :stage, presence: true validates :pipeline_name, presence: true + scope :next_stage_for, -> (entity_id) { + entity_scope = where(bulk_import_entity_id: entity_id) + next_stage_scope = entity_scope.with_status(:created).select('MIN(stage)') + + entity_scope.where(stage: next_stage_scope) + } + + def self.stage_running?(entity_id, stage) + where(stage: stage, bulk_import_entity_id: entity_id) + .with_status(:created, :started) + .exists? + end + state_machine :status, initial: :created do state :created, value: 0 state :started, value: 1 diff --git a/app/services/bulk_import_service.rb b/app/services/bulk_import_service.rb index 4e13e967dbd488..e6ec1ccd5367b7 100644 --- a/app/services/bulk_import_service.rb +++ b/app/services/bulk_import_service.rb @@ -54,14 +54,17 @@ def create_bulk_import bulk_import = BulkImport.create!(user: current_user, source_type: 'gitlab') bulk_import.create_configuration!(credentials.slice(:url, :access_token)) - params.each do |entity| - BulkImports::Entity.create!( + params.each do |entity_params| + entity = BulkImports::Entity.create!( bulk_import: bulk_import, - source_type: entity[:source_type], - source_full_path: entity[:source_full_path], - destination_name: entity[:destination_name], - destination_namespace: entity[:destination_namespace] + source_type: entity_params[:source_type], + source_full_path: entity_params[:source_full_path], + destination_name: entity_params[:destination_name], + destination_namespace: entity_params[:destination_namespace] ) + + # TODO: duplicated with `BulkImports::Groups::Pipelines::SubgroupEntitiesPipeline` + BulkImports::Groups::Stages.create_trackers!(entity) end bulk_import diff --git a/app/workers/bulk_import_worker.rb b/app/workers/bulk_import_worker.rb index e6bc54895a7fef..2554ec6238fa6c 100644 --- a/app/workers/bulk_import_worker.rb +++ b/app/workers/bulk_import_worker.rb @@ -21,9 +21,9 @@ def perform(bulk_import_id) @bulk_import.start! if @bulk_import.created? created_entities.first(next_batch_size).each do |entity| - entity.start! + jid = BulkImports::EntityWorker.perform_async(entity.id) - BulkImports::EntityWorker.perform_async(entity.id) + entity.update!(status_event: 'start', jid: jid) end re_enqueue diff --git a/app/workers/bulk_imports/entity_worker.rb b/app/workers/bulk_imports/entity_worker.rb index 5b41ccbdea1cac..40a0ed998d0061 100644 --- a/app/workers/bulk_imports/entity_worker.rb +++ b/app/workers/bulk_imports/entity_worker.rb @@ -10,24 +10,38 @@ class EntityWorker # rubocop:disable Scalability/IdempotentWorker worker_has_external_dependencies! - def perform(entity_id) - entity = BulkImports::Entity.with_status(:started).find_by_id(entity_id) + def perform(entity_id, current_stage = nil) + logger.info( + worker: self.class.name, + entity_id: entity_id, + current_stage: current_stage + ) - if entity - entity.update!(jid: jid) + return if stage_running?(entity_id, current_stage) - BulkImports::Importers::GroupImporter.new(entity).execute + next_stage_for(entity_id).each do |tracker| + BulkImports::PipelineWorker.perform_async(tracker.id) end - rescue => e - extra = { - bulk_import_id: entity&.bulk_import&.id, - entity_id: entity&.id - } + logger.error(worker: self.class.name, message: e.message) + + raise e + end + + private - Gitlab::ErrorTracking.track_exception(e, extra) + def stage_running?(entity_id, stage) + return unless stage + + BulkImports::Tracker.stage_running?(entity_id, stage) + end + + def next_stage_for(entity_id) + @next_stage_for ||= BulkImports::Tracker.next_stage_for(entity_id) + end - entity&.fail_op + def logger + @logger ||= Gitlab::Import::Logger end end end diff --git a/app/workers/bulk_imports/pipeline_worker.rb b/app/workers/bulk_imports/pipeline_worker.rb index afde96ae65aaf1..a55a4bff0f7c98 100644 --- a/app/workers/bulk_imports/pipeline_worker.rb +++ b/app/workers/bulk_imports/pipeline_worker.rb @@ -28,9 +28,11 @@ def perform(tracker_id) )) end ensure - ::BulkImports::StageWorker.perform_async(tracker.entity.id, tracker.stage) + ::BulkImports::EntityWorker.perform_async(tracker.entity.id, tracker.stage) end + private + def run(tracker) tracker.update!(status_event: 'start', jid: jid) diff --git a/app/workers/bulk_imports/stage_worker.rb b/app/workers/bulk_imports/stage_worker.rb deleted file mode 100644 index 4d7fa0624b450c..00000000000000 --- a/app/workers/bulk_imports/stage_worker.rb +++ /dev/null @@ -1,41 +0,0 @@ -# frozen_string_literal: true - -module BulkImports - class StageWorker - include ApplicationWorker - - feature_category :importers - - sidekiq_options retry: false, dead: false - - def perform(entity_id, current_stage = nil) - logger.info( - worker: self.class.name, - entity_id: entity_id, - current_stage: current_stage - ) - - entity = BulkImports::Entity.find(entity_id) - - if current_stage && entity.stage_running?(current_stage) - self.class.perform_in(TIMEOUT, entity_id, current_stage) - else - trackers = entity.next_stage_pipelines - - return if trackers.blank? - - trackers.each do |tracker| - BulkImports::PipelineWorker.perform_async(tracker.id) - end - end - rescue => e - logger.error(worker: self.class.name, message: e.message) - - raise e - end - - def logger - @logger ||= Gitlab::Import::Logger - end - end -end diff --git a/ee/lib/ee/bulk_imports/importers/group_importer.rb b/ee/lib/ee/bulk_imports/groups/stages.rb similarity index 90% rename from ee/lib/ee/bulk_imports/importers/group_importer.rb rename to ee/lib/ee/bulk_imports/groups/stages.rb index f89761d3af34b5..391198ebaae0aa 100644 --- a/ee/lib/ee/bulk_imports/importers/group_importer.rb +++ b/ee/lib/ee/bulk_imports/groups/stages.rb @@ -2,8 +2,8 @@ module EE module BulkImports - module Importers - module GroupImporter + module Groups + module Stages extend ::Gitlab::Utils::Override private diff --git a/lib/bulk_imports/groups/pipelines/entity_finisher.rb b/lib/bulk_imports/groups/pipelines/entity_finisher.rb index aca537cc7b51ba..ff808adc8d39db 100644 --- a/lib/bulk_imports/groups/pipelines/entity_finisher.rb +++ b/lib/bulk_imports/groups/pipelines/entity_finisher.rb @@ -9,7 +9,8 @@ def initialize(context) end def run - context.entity.finish! + context.entity.finish! unless context.entity.finished? + context.tracker.finish! unless context.tracker.finished? logger.info(entity_id: context.entity.id, message: 'Entity Finished') end diff --git a/lib/bulk_imports/groups/pipelines/subgroup_entities_pipeline.rb b/lib/bulk_imports/groups/pipelines/subgroup_entities_pipeline.rb index c47a8bd1daa280..c626750cab30ac 100644 --- a/lib/bulk_imports/groups/pipelines/subgroup_entities_pipeline.rb +++ b/lib/bulk_imports/groups/pipelines/subgroup_entities_pipeline.rb @@ -11,7 +11,10 @@ class SubgroupEntitiesPipeline transformer BulkImports::Groups::Transformers::SubgroupToEntityTransformer def load(context, data) - context.bulk_import.entities.create!(data) + entity = context.bulk_import.entities.create!(data) + + # TODO: duplicated with `BulkImportService::BulkImport::BulkImports::Entity` + BulkImports::Groups::Stages.create_trackers!(entity) end end end diff --git a/lib/bulk_imports/groups/stages.rb b/lib/bulk_imports/groups/stages.rb new file mode 100644 index 00000000000000..b7fd72e9f1042c --- /dev/null +++ b/lib/bulk_imports/groups/stages.rb @@ -0,0 +1,42 @@ +# frozen_string_literal: true + +module BulkImports + module Groups + class Stages + include Singleton + + def self.create_trackers!(entity) + instance.stages.each.with_index do |pipelines, stage| + Array.wrap(pipelines).each do |pipeline| + entity.trackers.create!(stage: stage, pipeline_name: pipeline) + end + end + end + + def stages + @stages ||= stage_list << last_stage + end + + private + + def last_stage + BulkImports::Groups::Pipelines::EntityFinisher + end + + # List of pipelines to run + # Pipelines grouped together in a sub-array will run in parallel + def stage_list + [ + BulkImports::Groups::Pipelines::GroupPipeline, + [ + BulkImports::Groups::Pipelines::SubgroupEntitiesPipeline, + BulkImports::Groups::Pipelines::MembersPipeline, + BulkImports::Groups::Pipelines::LabelsPipeline + ] + ] + end + end + end +end + +BulkImports::Groups::Stages.prepend_if_ee('EE::BulkImports::Groups::Stages') diff --git a/lib/bulk_imports/importers/group_importer.rb b/lib/bulk_imports/importers/group_importer.rb deleted file mode 100644 index d358e65d05e88c..00000000000000 --- a/lib/bulk_imports/importers/group_importer.rb +++ /dev/null @@ -1,67 +0,0 @@ -# frozen_string_literal: true - -module BulkImports - module Importers - class GroupImporter - def initialize(entity) - @entity = entity - end - - def execute - logger.info( - importer: self.class.name, - message: 'start', - entity_id: entity.id - ) - - # Store all the pipelines to run in the database - # They have a order column, which indicates the order they'll run - # Pipelines of the same entity in the same order number will run - # in parallel - stages.each.with_index do |pipelines, stage| - Array.wrap(pipelines).each do |pipeline| - entity.trackers.create!( - pipeline_name: pipeline.name, - stage: stage - ) - end - end - - # Ensure to be the last Pipeline to run - entity.trackers.create!( - pipeline_name: BulkImports::Groups::Pipelines::EntityFinisher.name, - stage: stages.length - ) - - BulkImports::StageWorker.perform_async(entity.id) - end - - private - - attr_reader :entity - - def stages - @stages ||= stage_list - end - - # List of pipelines to run - # Pipelines grouped together in a sub-array will run in parallel - def stage_list - [ - BulkImports::Groups::Pipelines::GroupPipeline, - [ - BulkImports::Groups::Pipelines::SubgroupEntitiesPipeline, - BulkImports::Groups::Pipelines::MembersPipeline, - BulkImports::Groups::Pipelines::LabelsPipeline - ] - ] - end - - def logger - @logger ||= Gitlab::Import::Logger.build - end - end - end -end - -BulkImports::Importers::GroupImporter.prepend_if_ee('EE::BulkImports::Importers::GroupImporter') diff --git a/lib/bulk_imports/pipeline/runner.rb b/lib/bulk_imports/pipeline/runner.rb index e13379d7637976..fd6b065ed8bd28 100644 --- a/lib/bulk_imports/pipeline/runner.rb +++ b/lib/bulk_imports/pipeline/runner.rb @@ -26,7 +26,7 @@ def run end end - if respond_to?(:after_run) + if extracted_data.data.present? && respond_to?(:after_run) run_pipeline_step(:after_run) do after_run(extracted_data) end -- GitLab From 90025ff6be42ccc6cecbd6f97b022b66ed225ca4 Mon Sep 17 00:00:00 2001 From: Kassio Borges Date: Tue, 2 Mar 2021 15:05:52 +0000 Subject: [PATCH 6/6] db constraint, avoid spread creation of trackers --- app/services/bulk_import_service.rb | 2 -- app/workers/bulk_import_worker.rb | 6 +++--- ...dd_pipeline_and_stage_to_bulk_import_trackers.rb | 13 +++++++++++-- db/structure.sql | 12 +++++++++--- .../groups/pipelines/subgroup_entities_pipeline.rb | 3 --- lib/bulk_imports/groups/stages.rb | 4 ++-- 6 files changed, 25 insertions(+), 15 deletions(-) diff --git a/app/services/bulk_import_service.rb b/app/services/bulk_import_service.rb index e6ec1ccd5367b7..0d73aba84bcb11 100644 --- a/app/services/bulk_import_service.rb +++ b/app/services/bulk_import_service.rb @@ -63,8 +63,6 @@ def create_bulk_import destination_namespace: entity_params[:destination_namespace] ) - # TODO: duplicated with `BulkImports::Groups::Pipelines::SubgroupEntitiesPipeline` - BulkImports::Groups::Stages.create_trackers!(entity) end bulk_import diff --git a/app/workers/bulk_import_worker.rb b/app/workers/bulk_import_worker.rb index 2554ec6238fa6c..9e40fa85bd4d4b 100644 --- a/app/workers/bulk_import_worker.rb +++ b/app/workers/bulk_import_worker.rb @@ -21,9 +21,9 @@ def perform(bulk_import_id) @bulk_import.start! if @bulk_import.created? created_entities.first(next_batch_size).each do |entity| - jid = BulkImports::EntityWorker.perform_async(entity.id) - - entity.update!(status_event: 'start', jid: jid) + BulkImports::Groups::Stages.create_trackers_for(entity) + BulkImports::EntityWorker.perform_async(entity.id) + entity.start! end re_enqueue diff --git a/db/migrate/20210223141620_add_pipeline_and_stage_to_bulk_import_trackers.rb b/db/migrate/20210223141620_add_pipeline_and_stage_to_bulk_import_trackers.rb index b0eb6cd3d81b01..aace626487dfb2 100644 --- a/db/migrate/20210223141620_add_pipeline_and_stage_to_bulk_import_trackers.rb +++ b/db/migrate/20210223141620_add_pipeline_and_stage_to_bulk_import_trackers.rb @@ -5,6 +5,8 @@ class AddPipelineAndStageToBulkImportTrackers < ActiveRecord::Migration[6.0] DOWNTIME = false + UNIQUE_INDEX = 'unique_pipeline_per_stage_per_entity' + disable_ddl_transaction! def up @@ -15,12 +17,17 @@ def up add_column :bulk_import_trackers, :stage, :integer, null: false, limit: 2 add_column :bulk_import_trackers, :status, :integer, null: false, limit: 2 - change_column_null :bulk_import_trackers, :relation, false + change_column_null :bulk_import_trackers, :relation, true end end add_text_limit :bulk_import_trackers, :pipeline_name, 255 add_text_limit :bulk_import_trackers, :jid, 255 + + add_concurrent_index :bulk_import_trackers, + [:bulk_import_entity_id, :pipeline_name, :stage], + name: UNIQUE_INDEX, + unique: true end def down @@ -30,7 +37,9 @@ def down remove_column :bulk_import_trackers, :stage, :integer remove_column :bulk_import_trackers, :status, :integer - change_column_null :bulk_import_trackers, :relation, true + change_column_null :bulk_import_trackers, :relation, false end + + remove_concurrent_index_by_name :bulk_import_trackers, UNIQUE_INDEX end end diff --git a/db/structure.sql b/db/structure.sql index 408d8c3d6f7ebe..0b55a08dd44d42 100644 --- a/db/structure.sql +++ b/db/structure.sql @@ -10085,7 +10085,7 @@ ALTER SEQUENCE bulk_import_failures_id_seq OWNED BY bulk_import_failures.id; CREATE TABLE bulk_import_trackers ( id bigint NOT NULL, bulk_import_entity_id bigint NOT NULL, - relation text NOT NULL, + relation text, next_page text, has_next_page boolean DEFAULT false NOT NULL, jid text, @@ -11035,6 +11035,8 @@ CREATE TABLE cluster_agent_tokens ( token_encrypted text NOT NULL, created_by_user_id bigint, description text, + name text, + CONSTRAINT check_2b79dbb315 CHECK ((char_length(name) <= 255)), CONSTRAINT check_4e4ec5070a CHECK ((char_length(description) <= 1024)), CONSTRAINT check_c60daed227 CHECK ((char_length(token_encrypted) <= 255)) ); @@ -14700,7 +14702,8 @@ CREATE TABLE onboarding_progresses ( security_scan_enabled_at timestamp with time zone, issue_auto_closed_at timestamp with time zone, repository_imported_at timestamp with time zone, - repository_mirrored_at timestamp with time zone + repository_mirrored_at timestamp with time zone, + issue_created_at timestamp with time zone ); CREATE SEQUENCE onboarding_progresses_id_seq @@ -17931,7 +17934,8 @@ CREATE TABLE user_preferences ( tab_width smallint, experience_level smallint, view_diffs_file_by_file boolean DEFAULT false NOT NULL, - gitpod_enabled boolean DEFAULT false NOT NULL + gitpod_enabled boolean DEFAULT false NOT NULL, + markdown_surround_selection boolean DEFAULT true NOT NULL ); CREATE SEQUENCE user_preferences_id_seq @@ -23956,6 +23960,8 @@ CREATE UNIQUE INDEX uniq_pkgs_debian_project_distributions_project_id_and_suite CREATE UNIQUE INDEX unique_merge_request_metrics_by_merge_request_id ON merge_request_metrics USING btree (merge_request_id); +CREATE UNIQUE INDEX unique_pipeline_per_stage_per_entity ON bulk_import_trackers USING btree (bulk_import_entity_id, pipeline_name, stage); + CREATE INDEX user_follow_users_followee_id_idx ON user_follow_users USING btree (followee_id); CREATE UNIQUE INDEX vulnerability_feedback_unique_idx ON vulnerability_feedback USING btree (project_id, category, feedback_type, project_fingerprint); diff --git a/lib/bulk_imports/groups/pipelines/subgroup_entities_pipeline.rb b/lib/bulk_imports/groups/pipelines/subgroup_entities_pipeline.rb index c626750cab30ac..de484e0c6f6e46 100644 --- a/lib/bulk_imports/groups/pipelines/subgroup_entities_pipeline.rb +++ b/lib/bulk_imports/groups/pipelines/subgroup_entities_pipeline.rb @@ -12,9 +12,6 @@ class SubgroupEntitiesPipeline def load(context, data) entity = context.bulk_import.entities.create!(data) - - # TODO: duplicated with `BulkImportService::BulkImport::BulkImports::Entity` - BulkImports::Groups::Stages.create_trackers!(entity) end end end diff --git a/lib/bulk_imports/groups/stages.rb b/lib/bulk_imports/groups/stages.rb index b7fd72e9f1042c..f16f28b8655da4 100644 --- a/lib/bulk_imports/groups/stages.rb +++ b/lib/bulk_imports/groups/stages.rb @@ -5,10 +5,10 @@ module Groups class Stages include Singleton - def self.create_trackers!(entity) + def self.create_trackers_for(entity) instance.stages.each.with_index do |pipelines, stage| Array.wrap(pipelines).each do |pipeline| - entity.trackers.create!(stage: stage, pipeline_name: pipeline) + entity.trackers.create(stage: stage, pipeline_name: pipeline) end end end -- GitLab