diff --git a/app/models/bulk_imports/entity.rb b/app/models/bulk_imports/entity.rb index ae1e36938092c1a340abd295537db3a6dd82c831..089eeb7cdfaa20507d03b26219b508e2d52ce051 100644 --- a/app/models/bulk_imports/entity.rb +++ b/app/models/bulk_imports/entity.rb @@ -68,25 +68,6 @@ class BulkImports::Entity < ApplicationRecord end end - def update_tracker_for(relation:, has_next_page:, next_page: nil) - attributes = { - relation: relation, - has_next_page: has_next_page, - next_page: next_page, - bulk_import_entity_id: id - } - - trackers.upsert(attributes, unique_by: %i[bulk_import_entity_id relation]) - end - - def has_next_page?(relation) - trackers.find_by(relation: relation)&.has_next_page - end - - def next_page_for(relation) - trackers.find_by(relation: relation)&.next_page - end - private def validate_parent_is_a_group diff --git a/app/models/bulk_imports/tracker.rb b/app/models/bulk_imports/tracker.rb index 02e0904e1afd0f2d10b5713a7b2e836cbb8bc43d..546bd0dc0fe6e462f0b684f65197c0a11b529ec2 100644 --- a/app/models/bulk_imports/tracker.rb +++ b/app/models/bulk_imports/tracker.rb @@ -10,9 +10,46 @@ class BulkImports::Tracker < ApplicationRecord foreign_key: :bulk_import_entity_id, optional: false - validates :relation, - presence: true, - uniqueness: { scope: :bulk_import_entity_id } - validates :next_page, presence: { if: :has_next_page? } + + validates :stage, presence: true + validates :pipeline_name, presence: true + + scope :next_stage_for, -> (entity_id) { + entity_scope = where(bulk_import_entity_id: entity_id) + next_stage_scope = entity_scope.with_status(:created).select('MIN(stage)') + + entity_scope.where(stage: next_stage_scope) + } + + def self.stage_running?(entity_id, stage) + where(stage: stage, bulk_import_entity_id: entity_id) + .with_status(:created, :started) + .exists? + end + + state_machine :status, initial: :created do + state :created, value: 0 + state :started, value: 1 + state :finished, value: 2 + state :failed, value: -1 + state :skipped, value: -2 + + event :start do + transition created: :started + end + + event :finish do + transition started: :finished + transition failed: :failed + end + + event :skip do + transition any => :skipped + end + + event :fail_op do + transition any => :failed + end + end end diff --git a/app/services/bulk_import_service.rb b/app/services/bulk_import_service.rb index 4e13e967dbd4886028f3ba909b256394c9cac009..0d73aba84bcb11400ff1af5bc7479e8a344a60d1 100644 --- a/app/services/bulk_import_service.rb +++ b/app/services/bulk_import_service.rb @@ -54,14 +54,15 @@ def create_bulk_import bulk_import = BulkImport.create!(user: current_user, source_type: 'gitlab') bulk_import.create_configuration!(credentials.slice(:url, :access_token)) - params.each do |entity| - BulkImports::Entity.create!( + params.each do |entity_params| + entity = BulkImports::Entity.create!( bulk_import: bulk_import, - source_type: entity[:source_type], - source_full_path: entity[:source_full_path], - destination_name: entity[:destination_name], - destination_namespace: entity[:destination_namespace] + source_type: entity_params[:source_type], + source_full_path: entity_params[:source_full_path], + destination_name: entity_params[:destination_name], + destination_namespace: entity_params[:destination_namespace] ) + end bulk_import diff --git a/app/workers/all_queues.yml b/app/workers/all_queues.yml index bded69e7f47cfa911f08c6d18d053c54eab32b55..8083ab572ae41e568c0313e5d003e5dc8214d570 100644 --- a/app/workers/all_queues.yml +++ b/app/workers/all_queues.yml @@ -1469,6 +1469,22 @@ :weight: 1 :idempotent: :tags: [] +- :name: bulk_imports_pipeline + :feature_category: :importers + :has_external_dependencies: true + :urgency: :low + :resource_boundary: :unknown + :weight: 1 + :idempotent: + :tags: [] +- :name: bulk_imports_stage + :feature_category: :importers + :has_external_dependencies: true + :urgency: :low + :resource_boundary: :unknown + :weight: 1 + :idempotent: + :tags: [] - :name: chat_notification :feature_category: :chatops :has_external_dependencies: true diff --git a/app/workers/bulk_import_worker.rb b/app/workers/bulk_import_worker.rb index e6bc54895a7fef59e13392016b04562e736c0a8f..9e40fa85bd4d4bdd4d617caf08802ed6d43cff8c 100644 --- a/app/workers/bulk_import_worker.rb +++ b/app/workers/bulk_import_worker.rb @@ -21,9 +21,9 @@ def perform(bulk_import_id) @bulk_import.start! if @bulk_import.created? created_entities.first(next_batch_size).each do |entity| - entity.start! - + BulkImports::Groups::Stages.create_trackers_for(entity) BulkImports::EntityWorker.perform_async(entity.id) + entity.start! end re_enqueue diff --git a/app/workers/bulk_imports/entity_worker.rb b/app/workers/bulk_imports/entity_worker.rb index 5b41ccbdea1cac6dbcd7fc30d6cfef13d786df70..40a0ed998d006106e21b2c98595ca0593dddc739 100644 --- a/app/workers/bulk_imports/entity_worker.rb +++ b/app/workers/bulk_imports/entity_worker.rb @@ -10,24 +10,38 @@ class EntityWorker # rubocop:disable Scalability/IdempotentWorker worker_has_external_dependencies! - def perform(entity_id) - entity = BulkImports::Entity.with_status(:started).find_by_id(entity_id) + def perform(entity_id, current_stage = nil) + logger.info( + worker: self.class.name, + entity_id: entity_id, + current_stage: current_stage + ) - if entity - entity.update!(jid: jid) + return if stage_running?(entity_id, current_stage) - BulkImports::Importers::GroupImporter.new(entity).execute + next_stage_for(entity_id).each do |tracker| + BulkImports::PipelineWorker.perform_async(tracker.id) end - rescue => e - extra = { - bulk_import_id: entity&.bulk_import&.id, - entity_id: entity&.id - } + logger.error(worker: self.class.name, message: e.message) + + raise e + end + + private - Gitlab::ErrorTracking.track_exception(e, extra) + def stage_running?(entity_id, stage) + return unless stage + + BulkImports::Tracker.stage_running?(entity_id, stage) + end + + def next_stage_for(entity_id) + @next_stage_for ||= BulkImports::Tracker.next_stage_for(entity_id) + end - entity&.fail_op + def logger + @logger ||= Gitlab::Import::Logger end end end diff --git a/app/workers/bulk_imports/pipeline_worker.rb b/app/workers/bulk_imports/pipeline_worker.rb new file mode 100644 index 0000000000000000000000000000000000000000..a55a4bff0f7c986e03128a8f3008dbc34555c403 --- /dev/null +++ b/app/workers/bulk_imports/pipeline_worker.rb @@ -0,0 +1,63 @@ +# frozen_string_literal: true + +module BulkImports + class PipelineWorker + include ApplicationWorker + + feature_category :importers + + sidekiq_options retry: false, dead: false + + worker_has_external_dependencies! + + def perform(tracker_id) + tracker = ::BulkImports::Tracker.with_status(:created).find(tracker_id) + + if tracker.present? + logger.info(log_params( + entity_id: tracker.entity.id, + pipeline_name: tracker.pipeline_name + )) + + run(tracker) + else + logger.error(log_params( + entity_id: entity_id, + pipeline_name: pipeline_name, + message: 'Pipeline does not exist' + )) + end + ensure + ::BulkImports::EntityWorker.perform_async(tracker.entity.id, tracker.stage) + end + + private + + def run(tracker) + tracker.update!(status_event: 'start', jid: jid) + + context = ::BulkImports::Pipeline::Context.new(tracker) + pipeline_class = tracker.pipeline_name.constantize + + pipeline_class.new(context).run + + tracker.finish! + rescue => e + tracker.fail_op! + + logger.error(log_params( + entity_id: tracker.entity.id, + pipeline_name: tracker.pipeline_name, + message: e.message + )) + end + + def log_params(extra = {}) + { worker: self.class.name }.merge(extra) + end + + def logger + @logger ||= Gitlab::Import::Logger.build + end + end +end diff --git a/config/sidekiq_queues.yml b/config/sidekiq_queues.yml index e7485540dc4f71ed0f84d1cd0ca4c58447c1376c..ee3f64874d36b281ef328ff15ca27e37fccf26f1 100644 --- a/config/sidekiq_queues.yml +++ b/config/sidekiq_queues.yml @@ -54,6 +54,10 @@ - 1 - - bulk_imports_entity - 1 +- - bulk_imports_pipeline + - 1 +- - bulk_imports_stage + - 1 - - chaos - 2 - - chat_notification diff --git a/db/migrate/20210223141620_add_pipeline_and_stage_to_bulk_import_trackers.rb b/db/migrate/20210223141620_add_pipeline_and_stage_to_bulk_import_trackers.rb new file mode 100644 index 0000000000000000000000000000000000000000..aace626487dfb2950d3d6397d4d785e5df6be22c --- /dev/null +++ b/db/migrate/20210223141620_add_pipeline_and_stage_to_bulk_import_trackers.rb @@ -0,0 +1,45 @@ +# frozen_string_literal: true + +class AddPipelineAndStageToBulkImportTrackers < ActiveRecord::Migration[6.0] + include Gitlab::Database::MigrationHelpers + + DOWNTIME = false + + UNIQUE_INDEX = 'unique_pipeline_per_stage_per_entity' + + disable_ddl_transaction! + + def up + unless column_exists?(:bulk_import_failures, :pipeline_name, :text) + with_lock_retries do + add_column :bulk_import_trackers, :jid, :text + add_column :bulk_import_trackers, :pipeline_name, :text + add_column :bulk_import_trackers, :stage, :integer, null: false, limit: 2 + add_column :bulk_import_trackers, :status, :integer, null: false, limit: 2 + + change_column_null :bulk_import_trackers, :relation, true + end + end + + add_text_limit :bulk_import_trackers, :pipeline_name, 255 + add_text_limit :bulk_import_trackers, :jid, 255 + + add_concurrent_index :bulk_import_trackers, + [:bulk_import_entity_id, :pipeline_name, :stage], + name: UNIQUE_INDEX, + unique: true + end + + def down + with_lock_retries do + remove_column :bulk_import_trackers, :jid, :text + remove_column :bulk_import_trackers, :pipeline_name, :text + remove_column :bulk_import_trackers, :stage, :integer + remove_column :bulk_import_trackers, :status, :integer + + change_column_null :bulk_import_trackers, :relation, false + end + + remove_concurrent_index_by_name :bulk_import_trackers, UNIQUE_INDEX + end +end diff --git a/db/schema_migrations/20210223141620 b/db/schema_migrations/20210223141620 new file mode 100644 index 0000000000000000000000000000000000000000..67536a1d5caa478626dc01c41af32c0af79a14eb --- /dev/null +++ b/db/schema_migrations/20210223141620 @@ -0,0 +1 @@ +9881e1236f4fa3b9944beba5ffbcbb4e5d8ad5144582e98f120ef722145f4534 \ No newline at end of file diff --git a/db/structure.sql b/db/structure.sql index 08d79ddf00786330109df04e0e5cb721ffb6cbe6..0b55a08dd44d422af57fad6f103ddcc5f3d4c002 100644 --- a/db/structure.sql +++ b/db/structure.sql @@ -10085,11 +10085,17 @@ ALTER SEQUENCE bulk_import_failures_id_seq OWNED BY bulk_import_failures.id; CREATE TABLE bulk_import_trackers ( id bigint NOT NULL, bulk_import_entity_id bigint NOT NULL, - relation text NOT NULL, + relation text, next_page text, has_next_page boolean DEFAULT false NOT NULL, + jid text, + pipeline_name text, + stage smallint NOT NULL, + status smallint NOT NULL, CONSTRAINT check_2d45cae629 CHECK ((char_length(relation) <= 255)), + CONSTRAINT check_328a4e67b0 CHECK ((char_length(pipeline_name) <= 255)), CONSTRAINT check_40aeaa600b CHECK ((char_length(next_page) <= 255)), + CONSTRAINT check_603f91cb06 CHECK ((char_length(jid) <= 255)), CONSTRAINT check_next_page_requirement CHECK (((has_next_page IS FALSE) OR (next_page IS NOT NULL))) ); @@ -11029,6 +11035,8 @@ CREATE TABLE cluster_agent_tokens ( token_encrypted text NOT NULL, created_by_user_id bigint, description text, + name text, + CONSTRAINT check_2b79dbb315 CHECK ((char_length(name) <= 255)), CONSTRAINT check_4e4ec5070a CHECK ((char_length(description) <= 1024)), CONSTRAINT check_c60daed227 CHECK ((char_length(token_encrypted) <= 255)) ); @@ -14694,7 +14702,8 @@ CREATE TABLE onboarding_progresses ( security_scan_enabled_at timestamp with time zone, issue_auto_closed_at timestamp with time zone, repository_imported_at timestamp with time zone, - repository_mirrored_at timestamp with time zone + repository_mirrored_at timestamp with time zone, + issue_created_at timestamp with time zone ); CREATE SEQUENCE onboarding_progresses_id_seq @@ -17925,7 +17934,8 @@ CREATE TABLE user_preferences ( tab_width smallint, experience_level smallint, view_diffs_file_by_file boolean DEFAULT false NOT NULL, - gitpod_enabled boolean DEFAULT false NOT NULL + gitpod_enabled boolean DEFAULT false NOT NULL, + markdown_surround_selection boolean DEFAULT true NOT NULL ); CREATE SEQUENCE user_preferences_id_seq @@ -23950,6 +23960,8 @@ CREATE UNIQUE INDEX uniq_pkgs_debian_project_distributions_project_id_and_suite CREATE UNIQUE INDEX unique_merge_request_metrics_by_merge_request_id ON merge_request_metrics USING btree (merge_request_id); +CREATE UNIQUE INDEX unique_pipeline_per_stage_per_entity ON bulk_import_trackers USING btree (bulk_import_entity_id, pipeline_name, stage); + CREATE INDEX user_follow_users_followee_id_idx ON user_follow_users USING btree (followee_id); CREATE UNIQUE INDEX vulnerability_feedback_unique_idx ON vulnerability_feedback USING btree (project_id, category, feedback_type, project_fingerprint); diff --git a/ee/lib/ee/bulk_imports/groups/graphql/get_epic_award_emoji_query.rb b/ee/lib/ee/bulk_imports/groups/graphql/get_epic_award_emoji_query.rb index 8aa9fb8da2f4b68d356d243475073f5e0053c107..39247565c873b2239e55fce5885c231f823dac80 100644 --- a/ee/lib/ee/bulk_imports/groups/graphql/get_epic_award_emoji_query.rb +++ b/ee/lib/ee/bulk_imports/groups/graphql/get_epic_award_emoji_query.rb @@ -32,11 +32,10 @@ def to_s def variables(context) iid = context.extra[:epic_iid] - tracker = "epic_#{iid}_award_emoji" { full_path: context.entity.source_full_path, - cursor: context.entity.next_page_for(tracker), + cursor: context.next_page, epic_iid: iid } end diff --git a/ee/lib/ee/bulk_imports/groups/graphql/get_epic_events_query.rb b/ee/lib/ee/bulk_imports/groups/graphql/get_epic_events_query.rb index 1e93d3220eadc0a555d8f48dc02f89caa9a8d43e..2fd36ee9257c41c1639405e2a7cb2b4b13914327 100644 --- a/ee/lib/ee/bulk_imports/groups/graphql/get_epic_events_query.rb +++ b/ee/lib/ee/bulk_imports/groups/graphql/get_epic_events_query.rb @@ -34,11 +34,10 @@ def to_s def variables(context) iid = context.extra[:epic_iid] - tracker = "epic_#{iid}_events" { full_path: context.entity.source_full_path, - cursor: context.entity.next_page_for(tracker), + cursor: context.next_page, epic_iid: iid } end diff --git a/ee/lib/ee/bulk_imports/groups/graphql/get_epics_query.rb b/ee/lib/ee/bulk_imports/groups/graphql/get_epics_query.rb index b292e6f631946e45453042589b4185cc3a4d5969..7c22e84a322190f4fd3082280d940b02f0807ac2 100644 --- a/ee/lib/ee/bulk_imports/groups/graphql/get_epics_query.rb +++ b/ee/lib/ee/bulk_imports/groups/graphql/get_epics_query.rb @@ -60,7 +60,7 @@ def to_s def variables(context) { full_path: context.entity.source_full_path, - cursor: context.entity.next_page_for(:epics) + cursor: context.next_page } end diff --git a/ee/lib/ee/bulk_imports/groups/pipelines/epic_award_emoji_pipeline.rb b/ee/lib/ee/bulk_imports/groups/pipelines/epic_award_emoji_pipeline.rb index efc4e8f2e3c235302e6b6945dffcb6cdf4f6d4f7..723f171e7b689f729d0a2700e018a30a196460b5 100644 --- a/ee/lib/ee/bulk_imports/groups/pipelines/epic_award_emoji_pipeline.rb +++ b/ee/lib/ee/bulk_imports/groups/pipelines/epic_award_emoji_pipeline.rb @@ -18,6 +18,7 @@ class EpicAwardEmojiPipeline # rubocop: disable CodeReuse/ActiveRecord def initialize(context) @context = context + @tracker = context.tracker @group = context.group @epic_iids = @group.epics.order(iid: :desc).pluck(:iid) @@ -25,11 +26,7 @@ def initialize(context) end def after_run(extracted_data) - iid = context.extra[:epic_iid] - tracker = "epic_#{iid}_award_emoji" - - context.entity.update_tracker_for( - relation: tracker, + tracker.update( has_next_page: extracted_data.has_next_page?, next_page: extracted_data.next_page ) diff --git a/ee/lib/ee/bulk_imports/groups/pipelines/epic_events_pipeline.rb b/ee/lib/ee/bulk_imports/groups/pipelines/epic_events_pipeline.rb index e0d558c78cd6169a2e97578cca48feb8cbb349be..8e13fb23fa394e97cc0feb2dbc11888e8e0987cf 100644 --- a/ee/lib/ee/bulk_imports/groups/pipelines/epic_events_pipeline.rb +++ b/ee/lib/ee/bulk_imports/groups/pipelines/epic_events_pipeline.rb @@ -8,13 +8,14 @@ class EpicEventsPipeline include ::BulkImports::Pipeline extractor ::BulkImports::Common::Extractors::GraphqlExtractor, - query: EE::BulkImports::Groups::Graphql::GetEpicEventsQuery + query: EE::BulkImports::Groups::Graphql::GetEpicEventsQuery transformer ::BulkImports::Common::Transformers::ProhibitedAttributesTransformer transformer ::BulkImports::Common::Transformers::UserReferenceTransformer, reference: 'author' def initialize(context) @context = context + @tracker = context.tracker @group = context.group @epic_iids = @group.epics.order(iid: :desc).pluck(:iid) # rubocop: disable CodeReuse/ActiveRecord @@ -48,11 +49,7 @@ def load(context, data) end def after_run(extracted_data) - iid = context.extra[:epic_iid] - tracker = "epic_#{iid}_events" - - context.entity.update_tracker_for( - relation: tracker, + tracker.update( has_next_page: extracted_data.has_next_page?, next_page: extracted_data.next_page ) diff --git a/ee/lib/ee/bulk_imports/groups/pipelines/epics_pipeline.rb b/ee/lib/ee/bulk_imports/groups/pipelines/epics_pipeline.rb index 6b6f1cf3e1f6395b2c785b56f262253a94e8c4da..67df16495f79450c2eb2986bd7d159797888c033 100644 --- a/ee/lib/ee/bulk_imports/groups/pipelines/epics_pipeline.rb +++ b/ee/lib/ee/bulk_imports/groups/pipelines/epics_pipeline.rb @@ -16,8 +16,7 @@ class EpicsPipeline loader EE::BulkImports::Groups::Loaders::EpicsLoader def after_run(extracted_data) - context.entity.update_tracker_for( - relation: :epics, + tracker.update( has_next_page: extracted_data.has_next_page?, next_page: extracted_data.next_page ) diff --git a/ee/lib/ee/bulk_imports/groups/stages.rb b/ee/lib/ee/bulk_imports/groups/stages.rb new file mode 100644 index 0000000000000000000000000000000000000000..391198ebaae0aab1054b278bd5395717823b902c --- /dev/null +++ b/ee/lib/ee/bulk_imports/groups/stages.rb @@ -0,0 +1,24 @@ +# frozen_string_literal: true + +module EE + module BulkImports + module Groups + module Stages + extend ::Gitlab::Utils::Override + + private + + override :stage_list + def stage_list + super + [ + EE::BulkImports::Groups::Pipelines::EpicsPipeline, + [ + EE::BulkImports::Groups::Pipelines::EpicAwardEmojiPipeline, + EE::BulkImports::Groups::Pipelines::EpicEventsPipeline + ] + ] + end + end + end + end +end diff --git a/ee/lib/ee/bulk_imports/importers/group_importer.rb b/ee/lib/ee/bulk_imports/importers/group_importer.rb deleted file mode 100644 index a4e04acda4a65360477597405a0ad17175ed7388..0000000000000000000000000000000000000000 --- a/ee/lib/ee/bulk_imports/importers/group_importer.rb +++ /dev/null @@ -1,22 +0,0 @@ -# frozen_string_literal: true - -module EE - module BulkImports - module Importers - module GroupImporter - extend ::Gitlab::Utils::Override - - private - - override :pipelines - def pipelines - super + [ - EE::BulkImports::Groups::Pipelines::EpicsPipeline, - EE::BulkImports::Groups::Pipelines::EpicAwardEmojiPipeline, - EE::BulkImports::Groups::Pipelines::EpicEventsPipeline - ] - end - end - end - end -end diff --git a/lib/bulk_imports/groups/graphql/get_labels_query.rb b/lib/bulk_imports/groups/graphql/get_labels_query.rb index 23efbc335810199e81c0eb6367b7c4524d3bdd00..bc3129de8b146e7c7c29262ce79b4720c23b4d38 100644 --- a/lib/bulk_imports/groups/graphql/get_labels_query.rb +++ b/lib/bulk_imports/groups/graphql/get_labels_query.rb @@ -31,7 +31,7 @@ def to_s def variables(context) { full_path: context.entity.source_full_path, - cursor: context.entity.next_page_for(:labels) + cursor: context.next_page } end diff --git a/lib/bulk_imports/groups/graphql/get_members_query.rb b/lib/bulk_imports/groups/graphql/get_members_query.rb index e3a78124a47cb1a7d29cf5afde70e4b02fcc0447..65397ffd75a03940527e05157c08a1eb865e38db 100644 --- a/lib/bulk_imports/groups/graphql/get_members_query.rb +++ b/lib/bulk_imports/groups/graphql/get_members_query.rb @@ -34,7 +34,7 @@ def to_s def variables(context) { full_path: context.entity.source_full_path, - cursor: context.entity.next_page_for(:group_members) + cursor: context.next_page } end diff --git a/lib/bulk_imports/groups/pipelines/entity_finisher.rb b/lib/bulk_imports/groups/pipelines/entity_finisher.rb new file mode 100644 index 0000000000000000000000000000000000000000..ff808adc8d39db9f7055ac157257c30cf31112b7 --- /dev/null +++ b/lib/bulk_imports/groups/pipelines/entity_finisher.rb @@ -0,0 +1,28 @@ +# frozen_string_literal: true + +module BulkImports + module Groups + module Pipelines + class EntityFinisher + def initialize(context) + @context = context + end + + def run + context.entity.finish! unless context.entity.finished? + context.tracker.finish! unless context.tracker.finished? + + logger.info(entity_id: context.entity.id, message: 'Entity Finished') + end + + private + + attr_reader :context + + def logger + @logger ||= Gitlab::Import::Logger.build + end + end + end + end +end diff --git a/lib/bulk_imports/groups/pipelines/labels_pipeline.rb b/lib/bulk_imports/groups/pipelines/labels_pipeline.rb index 9f8b86827510ac6f4a3386fae9e10d4e5d0b742b..61d3e6c700e94998294e1e4d9a1b36ea814ac6c0 100644 --- a/lib/bulk_imports/groups/pipelines/labels_pipeline.rb +++ b/lib/bulk_imports/groups/pipelines/labels_pipeline.rb @@ -16,8 +16,7 @@ def load(context, data) end def after_run(extracted_data) - context.entity.update_tracker_for( - relation: :labels, + tracker.update( has_next_page: extracted_data.has_next_page?, next_page: extracted_data.next_page ) diff --git a/lib/bulk_imports/groups/pipelines/members_pipeline.rb b/lib/bulk_imports/groups/pipelines/members_pipeline.rb index 32fc931e8c33a20d9d98e0e9000d0dece8a86e01..d29bd74c5ae01eecd3dfb51f487c2c580ca0fc4c 100644 --- a/lib/bulk_imports/groups/pipelines/members_pipeline.rb +++ b/lib/bulk_imports/groups/pipelines/members_pipeline.rb @@ -19,8 +19,7 @@ def load(context, data) end def after_run(extracted_data) - context.entity.update_tracker_for( - relation: :group_members, + tracker.update( has_next_page: extracted_data.has_next_page?, next_page: extracted_data.next_page ) diff --git a/lib/bulk_imports/groups/pipelines/subgroup_entities_pipeline.rb b/lib/bulk_imports/groups/pipelines/subgroup_entities_pipeline.rb index c47a8bd1daa28032832992db26973b142efca356..de484e0c6f6e462c8584e81cf869539175ba67fc 100644 --- a/lib/bulk_imports/groups/pipelines/subgroup_entities_pipeline.rb +++ b/lib/bulk_imports/groups/pipelines/subgroup_entities_pipeline.rb @@ -11,7 +11,7 @@ class SubgroupEntitiesPipeline transformer BulkImports::Groups::Transformers::SubgroupToEntityTransformer def load(context, data) - context.bulk_import.entities.create!(data) + entity = context.bulk_import.entities.create!(data) end end end diff --git a/lib/bulk_imports/groups/stages.rb b/lib/bulk_imports/groups/stages.rb new file mode 100644 index 0000000000000000000000000000000000000000..f16f28b8655da4cd916c769bed19ad844277e34c --- /dev/null +++ b/lib/bulk_imports/groups/stages.rb @@ -0,0 +1,42 @@ +# frozen_string_literal: true + +module BulkImports + module Groups + class Stages + include Singleton + + def self.create_trackers_for(entity) + instance.stages.each.with_index do |pipelines, stage| + Array.wrap(pipelines).each do |pipeline| + entity.trackers.create(stage: stage, pipeline_name: pipeline) + end + end + end + + def stages + @stages ||= stage_list << last_stage + end + + private + + def last_stage + BulkImports::Groups::Pipelines::EntityFinisher + end + + # List of pipelines to run + # Pipelines grouped together in a sub-array will run in parallel + def stage_list + [ + BulkImports::Groups::Pipelines::GroupPipeline, + [ + BulkImports::Groups::Pipelines::SubgroupEntitiesPipeline, + BulkImports::Groups::Pipelines::MembersPipeline, + BulkImports::Groups::Pipelines::LabelsPipeline + ] + ] + end + end + end +end + +BulkImports::Groups::Stages.prepend_if_ee('EE::BulkImports::Groups::Stages') diff --git a/lib/bulk_imports/importers/group_importer.rb b/lib/bulk_imports/importers/group_importer.rb deleted file mode 100644 index f967b7ad7ab7c90140265fdd495c3d762bc10497..0000000000000000000000000000000000000000 --- a/lib/bulk_imports/importers/group_importer.rb +++ /dev/null @@ -1,34 +0,0 @@ -# frozen_string_literal: true - -module BulkImports - module Importers - class GroupImporter - def initialize(entity) - @entity = entity - end - - def execute - context = BulkImports::Pipeline::Context.new(entity) - - pipelines.each { |pipeline| pipeline.new(context).run } - - entity.finish! - end - - private - - attr_reader :entity - - def pipelines - [ - BulkImports::Groups::Pipelines::GroupPipeline, - BulkImports::Groups::Pipelines::SubgroupEntitiesPipeline, - BulkImports::Groups::Pipelines::MembersPipeline, - BulkImports::Groups::Pipelines::LabelsPipeline - ] - end - end - end -end - -BulkImports::Importers::GroupImporter.prepend_if_ee('EE::BulkImports::Importers::GroupImporter') diff --git a/lib/bulk_imports/pipeline.rb b/lib/bulk_imports/pipeline.rb index f8e5edc27d835ae99285018fbe4d0182aa8edef5..302119d3cc586ab7efd34256f531d832963b2c3d 100644 --- a/lib/bulk_imports/pipeline.rb +++ b/lib/bulk_imports/pipeline.rb @@ -9,12 +9,13 @@ module Pipeline def initialize(context) @context = context + @tracker = context.tracker end included do private - attr_reader :context + attr_reader :context, :tracker # Fetch pipeline extractor. # An extractor is defined either by instance `#extract(context)` method diff --git a/lib/bulk_imports/pipeline/context.rb b/lib/bulk_imports/pipeline/context.rb index dd121b2dbedcca835c36643970bfc5e3ad0dd2c9..8fdc32ced4da9f33c3d6ef042e34d1d2aa4eee04 100644 --- a/lib/bulk_imports/pipeline/context.rb +++ b/lib/bulk_imports/pipeline/context.rb @@ -3,11 +3,12 @@ module BulkImports module Pipeline class Context - attr_reader :entity, :bulk_import + attr_reader :tracker, :entity, :bulk_import attr_accessor :extra - def initialize(entity, extra = {}) - @entity = entity + def initialize(tracker, extra = {}) + @tracker = tracker + @entity = tracker.entity @bulk_import = entity.bulk_import @extra = extra end @@ -23,6 +24,14 @@ def current_user def configuration bulk_import.configuration end + + def has_next_page? + tracker.has_next_page? + end + + def next_page + tracker.next_page + end end end end diff --git a/lib/bulk_imports/pipeline/runner.rb b/lib/bulk_imports/pipeline/runner.rb index d39f4121b51b66a1ec52ccc70e95fb6c0db1f8b7..fd6b065ed8bd28ec90f39794c217801b1bd7219c 100644 --- a/lib/bulk_imports/pipeline/runner.rb +++ b/lib/bulk_imports/pipeline/runner.rb @@ -26,7 +26,7 @@ def run end end - if respond_to?(:after_run) + if extracted_data.data.present? && respond_to?(:after_run) run_pipeline_step(:after_run) do after_run(extracted_data) end @@ -64,6 +64,7 @@ def extracted_data_from def mark_as_failed warn(message: 'Pipeline failed', pipeline_class: pipeline) + tracker.fail_op! context.entity.fail_op! end @@ -79,6 +80,8 @@ def log_skip(extra = {}) pipeline_class: pipeline }.merge(extra) + tracker.skip! + info(log) end @@ -92,6 +95,8 @@ def log_import_failure(exception, step) correlation_id_value: Labkit::Correlation::CorrelationId.current_or_new_id } + logger.error(log_params(attributes)) + BulkImports::Failure.create(attributes) end