diff --git a/app/models/bulk_imports/entity.rb b/app/models/bulk_imports/entity.rb index 9127dab56a6d35a456fd6f141a3435cbde75bcc6..04af11457692dabb27d6ceb24734b84aeea14418 100644 --- a/app/models/bulk_imports/entity.rb +++ b/app/models/bulk_imports/entity.rb @@ -68,25 +68,6 @@ class BulkImports::Entity < ApplicationRecord end end - def update_tracker_for(relation:, has_next_page:, next_page: nil) - attributes = { - relation: relation, - has_next_page: has_next_page, - next_page: next_page, - bulk_import_entity_id: id - } - - trackers.upsert(attributes, unique_by: %i[bulk_import_entity_id relation]) - end - - def has_next_page?(relation) - trackers.find_by(relation: relation)&.has_next_page - end - - def next_page_for(relation) - trackers.find_by(relation: relation)&.next_page - end - private def validate_parent_is_a_group diff --git a/app/models/bulk_imports/tracker.rb b/app/models/bulk_imports/tracker.rb index 182c0bbaa8a24aa3aa67594b322e23e4a2ee8bbc..8d16edccd5b5cc85e1c1dbec3e5f6262263928a0 100644 --- a/app/models/bulk_imports/tracker.rb +++ b/app/models/bulk_imports/tracker.rb @@ -3,6 +3,8 @@ class BulkImports::Tracker < ApplicationRecord self.table_name = 'bulk_import_trackers' + alias_attribute :pipeline_name, :relation + belongs_to :entity, class_name: 'BulkImports::Entity', foreign_key: :bulk_import_entity_id, @@ -28,6 +30,10 @@ class BulkImports::Tracker < ApplicationRecord end event :finish do + # When applying the concurrent model, + # remove the created => finished transaction + # https://gitlab.com/gitlab-org/gitlab/-/issues/323384 + transition created: :finished transition started: :finished transition failed: :failed transition skipped: :skipped diff --git a/changelogs/unreleased/kassio-bulkimports-track-pipeline-work.yml b/changelogs/unreleased/kassio-bulkimports-track-pipeline-work.yml new file mode 100644 index 0000000000000000000000000000000000000000..3665f0c995f8d2f737d1280d151bb4191726b6ba --- /dev/null +++ b/changelogs/unreleased/kassio-bulkimports-track-pipeline-work.yml @@ -0,0 +1,5 @@ +--- +title: 'BulkImports: Track pipeline worker with BulkImports::Tracker#status' +merge_request: 56242 +author: +type: changed diff --git a/ee/lib/ee/bulk_imports/groups/graphql/get_epic_award_emoji_query.rb b/ee/lib/ee/bulk_imports/groups/graphql/get_epic_award_emoji_query.rb index 8aa9fb8da2f4b68d356d243475073f5e0053c107..d98acf6d3cf650df41522251cc764235120c150f 100644 --- a/ee/lib/ee/bulk_imports/groups/graphql/get_epic_award_emoji_query.rb +++ b/ee/lib/ee/bulk_imports/groups/graphql/get_epic_award_emoji_query.rb @@ -32,11 +32,10 @@ def to_s def variables(context) iid = context.extra[:epic_iid] - tracker = "epic_#{iid}_award_emoji" { full_path: context.entity.source_full_path, - cursor: context.entity.next_page_for(tracker), + cursor: context.tracker.next_page, epic_iid: iid } end diff --git a/ee/lib/ee/bulk_imports/groups/graphql/get_epic_events_query.rb b/ee/lib/ee/bulk_imports/groups/graphql/get_epic_events_query.rb index 1e93d3220eadc0a555d8f48dc02f89caa9a8d43e..f19b5f4a251778d656b8e0c7989ed116df9256d6 100644 --- a/ee/lib/ee/bulk_imports/groups/graphql/get_epic_events_query.rb +++ b/ee/lib/ee/bulk_imports/groups/graphql/get_epic_events_query.rb @@ -34,11 +34,10 @@ def to_s def variables(context) iid = context.extra[:epic_iid] - tracker = "epic_#{iid}_events" { full_path: context.entity.source_full_path, - cursor: context.entity.next_page_for(tracker), + cursor: context.tracker.next_page, epic_iid: iid } end diff --git a/ee/lib/ee/bulk_imports/groups/graphql/get_epics_query.rb b/ee/lib/ee/bulk_imports/groups/graphql/get_epics_query.rb index 9f8099ae4714409e5f5d245be001e1196b0f282d..f8c5707b5df30c24b0b5da7dec343ed0ccc2dc10 100644 --- a/ee/lib/ee/bulk_imports/groups/graphql/get_epics_query.rb +++ b/ee/lib/ee/bulk_imports/groups/graphql/get_epics_query.rb @@ -61,7 +61,7 @@ def to_s def variables(context) { full_path: context.entity.source_full_path, - cursor: context.entity.next_page_for(:epics) + cursor: context.tracker.next_page } end diff --git a/ee/lib/ee/bulk_imports/groups/graphql/get_iterations_query.rb b/ee/lib/ee/bulk_imports/groups/graphql/get_iterations_query.rb index 4392f044ade7109d158fc15bea76eade46235d18..d49449e15ffba5f0d78cedcc755e0b9ed88e0452 100644 --- a/ee/lib/ee/bulk_imports/groups/graphql/get_iterations_query.rb +++ b/ee/lib/ee/bulk_imports/groups/graphql/get_iterations_query.rb @@ -35,7 +35,7 @@ def to_s def variables(context) { full_path: context.entity.source_full_path, - cursor: context.entity.next_page_for(:iterations) + cursor: context.tracker.next_page } end diff --git a/ee/lib/ee/bulk_imports/groups/pipelines/epic_award_emoji_pipeline.rb b/ee/lib/ee/bulk_imports/groups/pipelines/epic_award_emoji_pipeline.rb index efc4e8f2e3c235302e6b6945dffcb6cdf4f6d4f7..1300ae872ccbdb225cdd684b17048e42be500f4a 100644 --- a/ee/lib/ee/bulk_imports/groups/pipelines/epic_award_emoji_pipeline.rb +++ b/ee/lib/ee/bulk_imports/groups/pipelines/epic_award_emoji_pipeline.rb @@ -25,11 +25,7 @@ def initialize(context) end def after_run(extracted_data) - iid = context.extra[:epic_iid] - tracker = "epic_#{iid}_award_emoji" - - context.entity.update_tracker_for( - relation: tracker, + tracker.update( has_next_page: extracted_data.has_next_page?, next_page: extracted_data.next_page ) diff --git a/ee/lib/ee/bulk_imports/groups/pipelines/epic_events_pipeline.rb b/ee/lib/ee/bulk_imports/groups/pipelines/epic_events_pipeline.rb index e0d558c78cd6169a2e97578cca48feb8cbb349be..5506cbbab8f135647c64be0f661d35479c57080d 100644 --- a/ee/lib/ee/bulk_imports/groups/pipelines/epic_events_pipeline.rb +++ b/ee/lib/ee/bulk_imports/groups/pipelines/epic_events_pipeline.rb @@ -48,11 +48,7 @@ def load(context, data) end def after_run(extracted_data) - iid = context.extra[:epic_iid] - tracker = "epic_#{iid}_events" - - context.entity.update_tracker_for( - relation: tracker, + tracker.update( has_next_page: extracted_data.has_next_page?, next_page: extracted_data.next_page ) diff --git a/ee/lib/ee/bulk_imports/groups/pipelines/epics_pipeline.rb b/ee/lib/ee/bulk_imports/groups/pipelines/epics_pipeline.rb index 61ab3b035bff13c2c39d9b098a18c0f96d4f64d1..0e12248fd40f3dd467fbadabcfb8b9d19c6f123c 100644 --- a/ee/lib/ee/bulk_imports/groups/pipelines/epics_pipeline.rb +++ b/ee/lib/ee/bulk_imports/groups/pipelines/epics_pipeline.rb @@ -24,8 +24,7 @@ def load(context, data) end def after_run(extracted_data) - context.entity.update_tracker_for( - relation: :epics, + tracker.update( has_next_page: extracted_data.has_next_page?, next_page: extracted_data.next_page ) diff --git a/ee/lib/ee/bulk_imports/groups/pipelines/iterations_pipeline.rb b/ee/lib/ee/bulk_imports/groups/pipelines/iterations_pipeline.rb index 60e6aac2f2b451aa6e6ed7dbb41d1063a190c128..39f4e8f90e68e401b0ab27e26198689d147f956f 100644 --- a/ee/lib/ee/bulk_imports/groups/pipelines/iterations_pipeline.rb +++ b/ee/lib/ee/bulk_imports/groups/pipelines/iterations_pipeline.rb @@ -21,8 +21,7 @@ def load(context, data) end def after_run(extracted_data) - context.entity.update_tracker_for( - relation: :iterations, + tracker.update( has_next_page: extracted_data.has_next_page?, next_page: extracted_data.next_page ) diff --git a/ee/spec/lib/ee/bulk_imports/groups/graphql/get_epic_award_emoji_query_spec.rb b/ee/spec/lib/ee/bulk_imports/groups/graphql/get_epic_award_emoji_query_spec.rb index 5b2be39b9c77e35fa2cc1478585e7d62fbd8bd2b..cdb5dcaab30092190a6b0268a6c31c46618cb32e 100644 --- a/ee/spec/lib/ee/bulk_imports/groups/graphql/get_epic_award_emoji_query_spec.rb +++ b/ee/spec/lib/ee/bulk_imports/groups/graphql/get_epic_award_emoji_query_spec.rb @@ -4,7 +4,7 @@ RSpec.describe EE::BulkImports::Groups::Graphql::GetEpicAwardEmojiQuery do it 'has a valid query' do - context = BulkImports::Pipeline::Context.new(create(:bulk_import_entity), epic_iid: 1) + context = BulkImports::Pipeline::Context.new(create(:bulk_import_tracker), epic_iid: 1) result = GitlabSchema.execute( described_class.to_s, diff --git a/ee/spec/lib/ee/bulk_imports/groups/graphql/get_epic_events_query_spec.rb b/ee/spec/lib/ee/bulk_imports/groups/graphql/get_epic_events_query_spec.rb index 91359244f1d29a0841efe06b65e3167409d65206..42fab71c233966a738d879077df139ac5e9d6801 100644 --- a/ee/spec/lib/ee/bulk_imports/groups/graphql/get_epic_events_query_spec.rb +++ b/ee/spec/lib/ee/bulk_imports/groups/graphql/get_epic_events_query_spec.rb @@ -4,7 +4,7 @@ RSpec.describe EE::BulkImports::Groups::Graphql::GetEpicEventsQuery do it 'has a valid query' do - context = BulkImports::Pipeline::Context.new(create(:bulk_import_entity), epic_iid: 1) + context = BulkImports::Pipeline::Context.new(create(:bulk_import_tracker), epic_iid: 1) result = GitlabSchema.execute( described_class.to_s, diff --git a/ee/spec/lib/ee/bulk_imports/groups/graphql/get_epics_query_spec.rb b/ee/spec/lib/ee/bulk_imports/groups/graphql/get_epics_query_spec.rb index a666087cba2bda870b3ff17f2eab5f46802de625..ac82951bcad3814cec26dd0cf87f5580d130de0f 100644 --- a/ee/spec/lib/ee/bulk_imports/groups/graphql/get_epics_query_spec.rb +++ b/ee/spec/lib/ee/bulk_imports/groups/graphql/get_epics_query_spec.rb @@ -4,7 +4,7 @@ RSpec.describe EE::BulkImports::Groups::Graphql::GetEpicsQuery do it 'has a valid query' do - context = BulkImports::Pipeline::Context.new(create(:bulk_import_entity)) + context = BulkImports::Pipeline::Context.new(create(:bulk_import_tracker)) result = GitlabSchema.execute( described_class.to_s, diff --git a/ee/spec/lib/ee/bulk_imports/groups/graphql/get_iterations_query_spec.rb b/ee/spec/lib/ee/bulk_imports/groups/graphql/get_iterations_query_spec.rb index b0c2fb1f166db1dfc3550ed6e2c12515ae621141..203922e42c2271e68807a631bd7e4a6c3db2e471 100644 --- a/ee/spec/lib/ee/bulk_imports/groups/graphql/get_iterations_query_spec.rb +++ b/ee/spec/lib/ee/bulk_imports/groups/graphql/get_iterations_query_spec.rb @@ -4,8 +4,8 @@ RSpec.describe EE::BulkImports::Groups::Graphql::GetIterationsQuery do it 'has a valid query' do - entity = create(:bulk_import_entity) - context = BulkImports::Pipeline::Context.new(entity) + tracker = create(:bulk_import_tracker) + context = BulkImports::Pipeline::Context.new(tracker) query = GraphQL::Query.new( GitlabSchema, diff --git a/ee/spec/lib/ee/bulk_imports/groups/loaders/epic_award_emoji_loader_spec.rb b/ee/spec/lib/ee/bulk_imports/groups/loaders/epic_award_emoji_loader_spec.rb index 6610628197cb29516782f84a7a1ce3b5a8ca2cd5..382a65a5f799eae48b51616ab262ac0ef4a0e9fc 100644 --- a/ee/spec/lib/ee/bulk_imports/groups/loaders/epic_award_emoji_loader_spec.rb +++ b/ee/spec/lib/ee/bulk_imports/groups/loaders/epic_award_emoji_loader_spec.rb @@ -9,7 +9,8 @@ let_it_be(:epic) { create(:epic, group: group, iid: 1) } let_it_be(:bulk_import) { create(:bulk_import, user: user) } let_it_be(:entity) { create(:bulk_import_entity, bulk_import: bulk_import, group: group) } - let_it_be(:context) { BulkImports::Pipeline::Context.new(entity) } + let_it_be(:tracker) { create(:bulk_import_tracker, entity: entity) } + let_it_be(:context) { BulkImports::Pipeline::Context.new(tracker) } let_it_be(:data) do { diff --git a/ee/spec/lib/ee/bulk_imports/groups/pipelines/epic_award_emoji_pipeline_spec.rb b/ee/spec/lib/ee/bulk_imports/groups/pipelines/epic_award_emoji_pipeline_spec.rb index 6f49319d902422615e162467c73d9214fdd1c959..4351e22d80f1f43f6d6de078674cacf284eb8164 100644 --- a/ee/spec/lib/ee/bulk_imports/groups/pipelines/epic_award_emoji_pipeline_spec.rb +++ b/ee/spec/lib/ee/bulk_imports/groups/pipelines/epic_award_emoji_pipeline_spec.rb @@ -7,8 +7,8 @@ let_it_be(:user) { create(:user) } let_it_be(:group) { create(:group) } let_it_be(:epic) { create(:epic, group: group) } - let_it_be(:tracker) { "epic_#{epic.iid}_award_emoji" } let_it_be(:bulk_import) { create(:bulk_import, user: user) } + let_it_be(:entity) do create( :bulk_import_entity, @@ -20,7 +20,8 @@ ) end - let_it_be(:context) { BulkImports::Pipeline::Context.new(entity) } + let_it_be(:tracker) { create(:bulk_import_tracker, entity: entity) } + let_it_be(:context) { BulkImports::Pipeline::Context.new(tracker) } before do stub_licensed_features(epics: true) @@ -39,7 +40,7 @@ describe '#run' do it 'imports epic award emoji' do - data = extractor_data(has_next_page: false, cursor: cursor) + data = extractor_data(has_next_page: false) allow_next_instance_of(BulkImports::Common::Extractors::GraphqlExtractor) do |extractor| allow(extractor) @@ -61,10 +62,8 @@ subject.after_run(data) - page_tracker = entity.trackers.find_by(relation: tracker) - - expect(page_tracker.has_next_page).to eq(true) - expect(page_tracker.next_page).to eq(cursor) + expect(tracker.has_next_page).to eq(true) + expect(tracker.next_page).to eq(cursor) end end @@ -76,10 +75,8 @@ subject.after_run(data) - page_tracker = entity.trackers.find_by(relation: tracker) - - expect(page_tracker.has_next_page).to eq(false) - expect(page_tracker.next_page).to be_nil + expect(tracker.has_next_page).to eq(false) + expect(tracker.next_page).to be_nil end it 'updates context with next epic iid' do diff --git a/ee/spec/lib/ee/bulk_imports/groups/pipelines/epic_events_pipeline_spec.rb b/ee/spec/lib/ee/bulk_imports/groups/pipelines/epic_events_pipeline_spec.rb index ecda06523c988083c67997e8296c05a3e7fb4ac4..e4f609043943f53182c4d8485419491dcaf07914 100644 --- a/ee/spec/lib/ee/bulk_imports/groups/pipelines/epic_events_pipeline_spec.rb +++ b/ee/spec/lib/ee/bulk_imports/groups/pipelines/epic_events_pipeline_spec.rb @@ -7,8 +7,8 @@ let_it_be(:user) { create(:user) } let_it_be(:group) { create(:group) } let_it_be(:epic) { create(:epic, group: group) } - let_it_be(:tracker) { "epic_#{epic.iid}_events" } let_it_be(:bulk_import) { create(:bulk_import, user: user) } + let_it_be(:entity) do create( :bulk_import_entity, @@ -20,7 +20,8 @@ ) end - let_it_be(:context) { BulkImports::Pipeline::Context.new(entity) } + let_it_be(:tracker) { create(:bulk_import_tracker, entity: entity) } + let_it_be(:context) { BulkImports::Pipeline::Context.new(tracker) } before do stub_licensed_features(epics: true) @@ -39,7 +40,7 @@ describe '#run' do it 'imports epic events and resource state events' do - data = extractor_data(has_next_page: false, cursor: cursor) + data = extractor_data(has_next_page: false) allow_next_instance_of(BulkImports::Common::Extractors::GraphqlExtractor) do |extractor| allow(extractor) @@ -102,10 +103,8 @@ subject.after_run(data) - page_tracker = entity.trackers.find_by(relation: tracker) - - expect(page_tracker.has_next_page).to eq(true) - expect(page_tracker.next_page).to eq(cursor) + expect(tracker.has_next_page).to eq(true) + expect(tracker.next_page).to eq(cursor) end end @@ -117,10 +116,8 @@ subject.after_run(data) - page_tracker = entity.trackers.find_by(relation: tracker) - - expect(page_tracker.has_next_page).to eq(false) - expect(page_tracker.next_page).to be_nil + expect(tracker.has_next_page).to eq(false) + expect(tracker.next_page).to be_nil end it 'updates context with next epic iid' do diff --git a/ee/spec/lib/ee/bulk_imports/groups/pipelines/epics_pipeline_spec.rb b/ee/spec/lib/ee/bulk_imports/groups/pipelines/epics_pipeline_spec.rb index 514f4a2607dc9a8bdc5dd060322f8bb42d881bb3..8e9938d83b025ae2cccd02915b490dae413c9d29 100644 --- a/ee/spec/lib/ee/bulk_imports/groups/pipelines/epics_pipeline_spec.rb +++ b/ee/spec/lib/ee/bulk_imports/groups/pipelines/epics_pipeline_spec.rb @@ -6,8 +6,9 @@ let_it_be(:cursor) { 'cursor' } let_it_be(:user) { create(:user) } let_it_be(:group) { create(:group) } - let(:bulk_import) { create(:bulk_import, user: user) } - let(:entity) do + let_it_be(:bulk_import) { create(:bulk_import, user: user) } + + let_it_be(:entity) do create( :bulk_import_entity, group: group, @@ -18,7 +19,8 @@ ) end - let(:context) { BulkImports::Pipeline::Context.new(entity) } + let_it_be(:tracker) { create(:bulk_import_tracker, entity: entity) } + let_it_be(:context) { BulkImports::Pipeline::Context.new(tracker) } before do stub_licensed_features(epics: true) @@ -116,8 +118,6 @@ subject.after_run(data) - tracker = entity.trackers.find_by(relation: :epics) - expect(tracker.has_next_page).to eq(true) expect(tracker.next_page).to eq(cursor) end @@ -131,8 +131,6 @@ subject.after_run(data) - tracker = entity.trackers.find_by(relation: :epics) - expect(tracker.has_next_page).to eq(false) expect(tracker.next_page).to be_nil end diff --git a/ee/spec/lib/ee/bulk_imports/groups/pipelines/iterations_pipeline_spec.rb b/ee/spec/lib/ee/bulk_imports/groups/pipelines/iterations_pipeline_spec.rb index d2163ce86fbab2e08948cff1c5854a01cb579290..b6887e224ffbf01cd805a0a84e2680d52f2f8e04 100644 --- a/ee/spec/lib/ee/bulk_imports/groups/pipelines/iterations_pipeline_spec.rb +++ b/ee/spec/lib/ee/bulk_imports/groups/pipelines/iterations_pipeline_spec.rb @@ -9,7 +9,7 @@ let_it_be(:timestamp) { Time.new(2020, 01, 01).utc } let_it_be(:bulk_import) { create(:bulk_import, user: user) } - let(:entity) do + let_it_be(:entity) do create( :bulk_import_entity, bulk_import: bulk_import, @@ -20,7 +20,8 @@ ) end - let(:context) { BulkImports::Pipeline::Context.new(entity) } + let_it_be(:tracker) { create(:bulk_import_tracker, entity: entity) } + let_it_be(:context) { BulkImports::Pipeline::Context.new(tracker) } subject { described_class.new(context) } @@ -85,8 +86,6 @@ def extracted_data(title:, has_next_page:, cursor: nil, start_date: Date.today) subject.after_run(data) - tracker = entity.trackers.find_by(relation: :iterations) - expect(tracker.has_next_page).to eq(true) expect(tracker.next_page).to eq(cursor) end @@ -100,8 +99,6 @@ def extracted_data(title:, has_next_page:, cursor: nil, start_date: Date.today) subject.after_run(data) - tracker = entity.trackers.find_by(relation: :iterations) - expect(tracker.has_next_page).to eq(false) expect(tracker.next_page).to be_nil end diff --git a/ee/spec/lib/ee/bulk_imports/groups/transformers/epic_attributes_transformer_spec.rb b/ee/spec/lib/ee/bulk_imports/groups/transformers/epic_attributes_transformer_spec.rb index 0a28d21189b432b99ba77d14dbdc9c867bf2bd9d..997df43fff4f9ec8dbff27158a23a179a19612ed 100644 --- a/ee/spec/lib/ee/bulk_imports/groups/transformers/epic_attributes_transformer_spec.rb +++ b/ee/spec/lib/ee/bulk_imports/groups/transformers/epic_attributes_transformer_spec.rb @@ -7,7 +7,8 @@ let_it_be(:group) { create(:group) } let_it_be(:bulk_import) { create(:bulk_import, user: importer_user) } let_it_be(:entity) { create(:bulk_import_entity, bulk_import: bulk_import, group: group) } - let_it_be(:context) { ::BulkImports::Pipeline::Context.new(entity) } + let_it_be(:tracker) { create(:bulk_import_tracker, entity: entity) } + let_it_be(:context) { BulkImports::Pipeline::Context.new(tracker) } describe '#transform' do it 'transforms the epic attributes' do diff --git a/ee/spec/lib/ee/bulk_imports/importers/group_importer_spec.rb b/ee/spec/lib/ee/bulk_imports/importers/group_importer_spec.rb index 7b93a8ea1f23966837b518a3305759869c6fa70d..68aedb7830adce7fe3afb8a134ccb9238e53b3ce 100644 --- a/ee/spec/lib/ee/bulk_imports/importers/group_importer_spec.rb +++ b/ee/spec/lib/ee/bulk_imports/importers/group_importer_spec.rb @@ -3,14 +3,14 @@ require 'spec_helper' RSpec.describe BulkImports::Importers::GroupImporter do - let(:user) { create(:user) } - let(:group) { create(:group) } - let(:bulk_import) { create(:bulk_import, user: user) } - let(:bulk_import_entity) { create(:bulk_import_entity, :started, bulk_import: bulk_import, group: group) } - let(:bulk_import_configuration) { create(:bulk_import_configuration, bulk_import: bulk_import) } - let(:context) { BulkImports::Pipeline::Context.new(bulk_import_entity) } + let_it_be(:user) { create(:user) } + let_it_be(:group) { create(:group) } + let_it_be(:bulk_import) { create(:bulk_import, user: user) } + let_it_be(:entity) { create(:bulk_import_entity, :started, bulk_import: bulk_import, group: group) } + let_it_be(:tracker) { create(:bulk_import_tracker, entity: entity) } + let_it_be(:context) { BulkImports::Pipeline::Context.new(tracker) } - subject { described_class.new(bulk_import_entity) } + subject { described_class.new(entity) } before do allow(BulkImports::Pipeline::Context).to receive(:new).and_return(context) @@ -25,10 +25,12 @@ expect_to_run_pipeline BulkImports::Groups::Pipelines::MilestonesPipeline, context: context expect_to_run_pipeline EE::BulkImports::Groups::Pipelines::EpicsPipeline, context: context expect_to_run_pipeline EE::BulkImports::Groups::Pipelines::EpicAwardEmojiPipeline, context: context + expect_to_run_pipeline EE::BulkImports::Groups::Pipelines::EpicEventsPipeline, context: context + expect_to_run_pipeline EE::BulkImports::Groups::Pipelines::IterationsPipeline, context: context subject.execute - expect(bulk_import_entity.reload).to be_finished + expect(entity.reload).to be_finished end end diff --git a/lib/bulk_imports/groups/graphql/get_labels_query.rb b/lib/bulk_imports/groups/graphql/get_labels_query.rb index 23efbc335810199e81c0eb6367b7c4524d3bdd00..2d246a217a7bf4f4bb42666a6c3ac3b4aed2ccb2 100644 --- a/lib/bulk_imports/groups/graphql/get_labels_query.rb +++ b/lib/bulk_imports/groups/graphql/get_labels_query.rb @@ -31,7 +31,7 @@ def to_s def variables(context) { full_path: context.entity.source_full_path, - cursor: context.entity.next_page_for(:labels) + cursor: context.tracker.next_page } end diff --git a/lib/bulk_imports/groups/graphql/get_members_query.rb b/lib/bulk_imports/groups/graphql/get_members_query.rb index e3a78124a47cb1a7d29cf5afde70e4b02fcc0447..f3a0d77acc2f9441c16e4793831a3dc6a4a4f84a 100644 --- a/lib/bulk_imports/groups/graphql/get_members_query.rb +++ b/lib/bulk_imports/groups/graphql/get_members_query.rb @@ -34,7 +34,7 @@ def to_s def variables(context) { full_path: context.entity.source_full_path, - cursor: context.entity.next_page_for(:group_members) + cursor: context.tracker.next_page } end diff --git a/lib/bulk_imports/groups/graphql/get_milestones_query.rb b/lib/bulk_imports/groups/graphql/get_milestones_query.rb index 2ade87e6fa091d66052abd12cdb8bb36473ed321..c254b35054a212db9d34ddacb7c8a4638b7966d0 100644 --- a/lib/bulk_imports/groups/graphql/get_milestones_query.rb +++ b/lib/bulk_imports/groups/graphql/get_milestones_query.rb @@ -33,7 +33,7 @@ def to_s def variables(context) { full_path: context.entity.source_full_path, - cursor: context.entity.next_page_for(:milestones) + cursor: context.tracker.next_page } end diff --git a/lib/bulk_imports/groups/pipelines/labels_pipeline.rb b/lib/bulk_imports/groups/pipelines/labels_pipeline.rb index 9f8b86827510ac6f4a3386fae9e10d4e5d0b742b..61d3e6c700e94998294e1e4d9a1b36ea814ac6c0 100644 --- a/lib/bulk_imports/groups/pipelines/labels_pipeline.rb +++ b/lib/bulk_imports/groups/pipelines/labels_pipeline.rb @@ -16,8 +16,7 @@ def load(context, data) end def after_run(extracted_data) - context.entity.update_tracker_for( - relation: :labels, + tracker.update( has_next_page: extracted_data.has_next_page?, next_page: extracted_data.next_page ) diff --git a/lib/bulk_imports/groups/pipelines/members_pipeline.rb b/lib/bulk_imports/groups/pipelines/members_pipeline.rb index 32fc931e8c33a20d9d98e0e9000d0dece8a86e01..d29bd74c5ae01eecd3dfb51f487c2c580ca0fc4c 100644 --- a/lib/bulk_imports/groups/pipelines/members_pipeline.rb +++ b/lib/bulk_imports/groups/pipelines/members_pipeline.rb @@ -19,8 +19,7 @@ def load(context, data) end def after_run(extracted_data) - context.entity.update_tracker_for( - relation: :group_members, + tracker.update( has_next_page: extracted_data.has_next_page?, next_page: extracted_data.next_page ) diff --git a/lib/bulk_imports/groups/pipelines/milestones_pipeline.rb b/lib/bulk_imports/groups/pipelines/milestones_pipeline.rb index 8497162e0e73c7bf7db003e54d8919c6a03031ac..eb51424c14a4ca053027c8b216debc638c4abb6e 100644 --- a/lib/bulk_imports/groups/pipelines/milestones_pipeline.rb +++ b/lib/bulk_imports/groups/pipelines/milestones_pipeline.rb @@ -20,8 +20,7 @@ def load(context, data) end def after_run(extracted_data) - context.entity.update_tracker_for( - relation: :milestones, + tracker.update( has_next_page: extracted_data.has_next_page?, next_page: extracted_data.next_page ) diff --git a/lib/bulk_imports/importers/group_importer.rb b/lib/bulk_imports/importers/group_importer.rb index f016b552fd45e5ab35a494cd0444eaa156451513..ccc61ee787daf5685a649192106df934875510a3 100644 --- a/lib/bulk_imports/importers/group_importer.rb +++ b/lib/bulk_imports/importers/group_importer.rb @@ -8,9 +8,18 @@ def initialize(entity) end def execute - context = BulkImports::Pipeline::Context.new(entity) + pipelines.each.with_index do |pipeline, stage| + pipeline_tracker = entity.trackers.create!( + pipeline_name: pipeline, + stage: stage + ) - pipelines.each { |pipeline| pipeline.new(context).run } + context = BulkImports::Pipeline::Context.new(pipeline_tracker) + + pipeline.new(context).run + + pipeline_tracker.finish! + end entity.finish! end diff --git a/lib/bulk_imports/pipeline.rb b/lib/bulk_imports/pipeline.rb index 1444516273798685bc6ae3a855ee4072d5e42d02..df4f020d6b2f1126d0b0761c32b910ec9750f92b 100644 --- a/lib/bulk_imports/pipeline.rb +++ b/lib/bulk_imports/pipeline.rb @@ -15,6 +15,10 @@ def initialize(context) @context = context end + def tracker + @tracker ||= context.tracker + end + included do private diff --git a/lib/bulk_imports/pipeline/context.rb b/lib/bulk_imports/pipeline/context.rb index dd121b2dbedcca835c36643970bfc5e3ad0dd2c9..3c69c729f36fbf001ec8f975fa2a9f1010d874ef 100644 --- a/lib/bulk_imports/pipeline/context.rb +++ b/lib/bulk_imports/pipeline/context.rb @@ -3,25 +3,33 @@ module BulkImports module Pipeline class Context - attr_reader :entity, :bulk_import attr_accessor :extra - def initialize(entity, extra = {}) - @entity = entity - @bulk_import = entity.bulk_import + attr_reader :tracker + + def initialize(tracker, extra = {}) + @tracker = tracker @extra = extra end + def entity + @entity ||= tracker.entity + end + def group - entity.group + @group ||= entity.group + end + + def bulk_import + @bulk_import ||= entity.bulk_import end def current_user - bulk_import.user + @current_user ||= bulk_import.user end def configuration - bulk_import.configuration + @configuration ||= bulk_import.configuration end end end diff --git a/lib/bulk_imports/pipeline/extracted_data.rb b/lib/bulk_imports/pipeline/extracted_data.rb index 685a91a4afeb757b5750da7340b56f5ca031b19f..15b5caa1a47894b44c82cd180b63ab7d46ac0f01 100644 --- a/lib/bulk_imports/pipeline/extracted_data.rb +++ b/lib/bulk_imports/pipeline/extracted_data.rb @@ -11,11 +11,14 @@ def initialize(data: nil, page_info: {}) end def has_next_page? - @page_info['has_next_page'] + Gitlab::Utils.to_boolean( + @page_info&.dig('has_next_page'), + default: false + ) end def next_page - @page_info['end_cursor'] + @page_info&.dig('end_cursor') end def each(&block) diff --git a/lib/bulk_imports/pipeline/runner.rb b/lib/bulk_imports/pipeline/runner.rb index e3535e585cc32b0e6b59c248f94d44d8607f45f5..588d2c87209472fb29ef01de34236932ae95f9ee 100644 --- a/lib/bulk_imports/pipeline/runner.rb +++ b/lib/bulk_imports/pipeline/runner.rb @@ -26,7 +26,7 @@ def run end end - if respond_to?(:after_run) + if extracted_data && respond_to?(:after_run) run_pipeline_step(:after_run) do after_run(extracted_data) end @@ -34,7 +34,7 @@ def run info(message: 'Pipeline finished') rescue MarkedAsFailedError - log_skip + skip!('Skipping pipeline due to failed entity') end private # rubocop:disable Lint/UselessAccessModifier @@ -46,7 +46,11 @@ def run_pipeline_step(step, class_name = nil) yield rescue MarkedAsFailedError - log_skip(step => class_name) + skip!( + 'Skipping pipeline due to failed entity', + pipeline_step: step, + step_class: class_name + ) rescue => e log_import_failure(e, step) @@ -65,10 +69,13 @@ def mark_as_failed warn(message: 'Pipeline failed') context.entity.fail_op! + tracker.fail_op! end - def log_skip(extra = {}) - info({ message: 'Skipping due to failed pipeline status' }.merge(extra)) + def skip!(message, extra = {}) + warn({ message: message }.merge(extra)) + + tracker.skip! end def log_import_failure(exception, step) diff --git a/spec/lib/bulk_imports/common/transformers/user_reference_transformer_spec.rb b/spec/lib/bulk_imports/common/transformers/user_reference_transformer_spec.rb index ff11a10bfe9a96ba1467f2d63d8fd2fd7359fb03..e86a584d38a60105f6f58eae68753f53d492dec3 100644 --- a/spec/lib/bulk_imports/common/transformers/user_reference_transformer_spec.rb +++ b/spec/lib/bulk_imports/common/transformers/user_reference_transformer_spec.rb @@ -8,7 +8,8 @@ let_it_be(:group) { create(:group) } let_it_be(:bulk_import) { create(:bulk_import) } let_it_be(:entity) { create(:bulk_import_entity, bulk_import: bulk_import, group: group) } - let_it_be(:context) { BulkImports::Pipeline::Context.new(entity) } + let_it_be(:tracker) { create(:bulk_import_tracker, entity: entity) } + let_it_be(:context) { BulkImports::Pipeline::Context.new(tracker) } let(:hash) do { diff --git a/spec/lib/bulk_imports/groups/extractors/subgroups_extractor_spec.rb b/spec/lib/bulk_imports/groups/extractors/subgroups_extractor_spec.rb index 627247c04abb8b24a39fa1e333dfdd92daafeef9..ac8786440e9e01fead163522d53dab17a35ec50f 100644 --- a/spec/lib/bulk_imports/groups/extractors/subgroups_extractor_spec.rb +++ b/spec/lib/bulk_imports/groups/extractors/subgroups_extractor_spec.rb @@ -8,8 +8,9 @@ bulk_import = create(:bulk_import) create(:bulk_import_configuration, bulk_import: bulk_import) entity = create(:bulk_import_entity, bulk_import: bulk_import) + tracker = create(:bulk_import_tracker, entity: entity) response = [{ 'test' => 'group' }] - context = BulkImports::Pipeline::Context.new(entity) + context = BulkImports::Pipeline::Context.new(tracker) allow_next_instance_of(BulkImports::Clients::Http) do |client| allow(client).to receive(:each_page).and_return(response) diff --git a/spec/lib/bulk_imports/groups/graphql/get_group_query_spec.rb b/spec/lib/bulk_imports/groups/graphql/get_group_query_spec.rb index ef46da7062b746a2c7483d60452dd330aa15a798..b0f8f74783b923ece6df128ae355164f7c34630e 100644 --- a/spec/lib/bulk_imports/groups/graphql/get_group_query_spec.rb +++ b/spec/lib/bulk_imports/groups/graphql/get_group_query_spec.rb @@ -4,10 +4,10 @@ RSpec.describe BulkImports::Groups::Graphql::GetGroupQuery do describe '#variables' do - let(:entity) { double(source_full_path: 'test', bulk_import: nil) } - let(:context) { BulkImports::Pipeline::Context.new(entity) } - it 'returns query variables based on entity information' do + entity = double(source_full_path: 'test', bulk_import: nil) + tracker = double(entity: entity) + context = BulkImports::Pipeline::Context.new(tracker) expected = { full_path: entity.source_full_path } expect(described_class.variables(context)).to eq(expected) diff --git a/spec/lib/bulk_imports/groups/graphql/get_labels_query_spec.rb b/spec/lib/bulk_imports/groups/graphql/get_labels_query_spec.rb index 85f82be7d18ef10e4cfe410f3f2fbc534307a257..61db644a3721921207e538f0955abd6cace8698d 100644 --- a/spec/lib/bulk_imports/groups/graphql/get_labels_query_spec.rb +++ b/spec/lib/bulk_imports/groups/graphql/get_labels_query_spec.rb @@ -4,8 +4,8 @@ RSpec.describe BulkImports::Groups::Graphql::GetLabelsQuery do it 'has a valid query' do - entity = create(:bulk_import_entity) - context = BulkImports::Pipeline::Context.new(entity) + tracker = create(:bulk_import_tracker) + context = BulkImports::Pipeline::Context.new(tracker) query = GraphQL::Query.new( GitlabSchema, diff --git a/spec/lib/bulk_imports/groups/graphql/get_members_query_spec.rb b/spec/lib/bulk_imports/groups/graphql/get_members_query_spec.rb index 5d05f5a2d30628055724da20e78aa29554067705..d0c4bb817b2c16d0b8be771d77d4fab3b5b67b0e 100644 --- a/spec/lib/bulk_imports/groups/graphql/get_members_query_spec.rb +++ b/spec/lib/bulk_imports/groups/graphql/get_members_query_spec.rb @@ -4,8 +4,8 @@ RSpec.describe BulkImports::Groups::Graphql::GetMembersQuery do it 'has a valid query' do - entity = create(:bulk_import_entity) - context = BulkImports::Pipeline::Context.new(entity) + tracker = create(:bulk_import_tracker) + context = BulkImports::Pipeline::Context.new(tracker) query = GraphQL::Query.new( GitlabSchema, diff --git a/spec/lib/bulk_imports/groups/graphql/get_milestones_query_spec.rb b/spec/lib/bulk_imports/groups/graphql/get_milestones_query_spec.rb index a38505fbf85f39bb8a93b9c7c03070df4c2aa3be..7a0f964c5f3537a3aa5e95d185de70371ffc5b65 100644 --- a/spec/lib/bulk_imports/groups/graphql/get_milestones_query_spec.rb +++ b/spec/lib/bulk_imports/groups/graphql/get_milestones_query_spec.rb @@ -4,8 +4,8 @@ RSpec.describe BulkImports::Groups::Graphql::GetMilestonesQuery do it 'has a valid query' do - entity = create(:bulk_import_entity) - context = BulkImports::Pipeline::Context.new(entity) + tracker = create(:bulk_import_tracker) + context = BulkImports::Pipeline::Context.new(tracker) query = GraphQL::Query.new( GitlabSchema, diff --git a/spec/lib/bulk_imports/groups/loaders/group_loader_spec.rb b/spec/lib/bulk_imports/groups/loaders/group_loader_spec.rb index 183292722d2d08bc81a435102de6c0d6c88c22a6..533955b057cf8ce5f0851013a45c3dd7b3b4b494 100644 --- a/spec/lib/bulk_imports/groups/loaders/group_loader_spec.rb +++ b/spec/lib/bulk_imports/groups/loaders/group_loader_spec.rb @@ -4,12 +4,13 @@ RSpec.describe BulkImports::Groups::Loaders::GroupLoader do describe '#load' do - let(:user) { create(:user) } - let(:data) { { foo: :bar } } + let_it_be(:user) { create(:user) } + let_it_be(:bulk_import) { create(:bulk_import, user: user) } + let_it_be(:entity) { create(:bulk_import_entity, bulk_import: bulk_import) } + let_it_be(:tracker) { create(:bulk_import_tracker, entity: entity) } + let_it_be(:context) { BulkImports::Pipeline::Context.new(tracker) } let(:service_double) { instance_double(::Groups::CreateService) } - let(:bulk_import) { create(:bulk_import, user: user) } - let(:entity) { create(:bulk_import_entity, bulk_import: bulk_import) } - let(:context) { BulkImports::Pipeline::Context.new(entity) } + let(:data) { { foo: :bar } } subject { described_class.new } diff --git a/spec/lib/bulk_imports/groups/pipelines/group_pipeline_spec.rb b/spec/lib/bulk_imports/groups/pipelines/group_pipeline_spec.rb index 61950cdd9b016ea548823273eddfb93c04db5605..39e782dc093e2adf2e6cc61d56753e214251b193 100644 --- a/spec/lib/bulk_imports/groups/pipelines/group_pipeline_spec.rb +++ b/spec/lib/bulk_imports/groups/pipelines/group_pipeline_spec.rb @@ -4,10 +4,11 @@ RSpec.describe BulkImports::Groups::Pipelines::GroupPipeline do describe '#run' do - let(:user) { create(:user) } - let(:parent) { create(:group) } - let(:bulk_import) { create(:bulk_import, user: user) } - let(:entity) do + let_it_be(:user) { create(:user) } + let_it_be(:parent) { create(:group) } + let_it_be(:bulk_import) { create(:bulk_import, user: user) } + + let_it_be(:entity) do create( :bulk_import_entity, bulk_import: bulk_import, @@ -17,7 +18,8 @@ ) end - let(:context) { BulkImports::Pipeline::Context.new(entity) } + let_it_be(:tracker) { create(:bulk_import_tracker, entity: entity) } + let_it_be(:context) { BulkImports::Pipeline::Context.new(tracker) } let(:group_data) do { @@ -37,7 +39,7 @@ before do allow_next_instance_of(BulkImports::Common::Extractors::GraphqlExtractor) do |extractor| - allow(extractor).to receive(:extract).and_return([group_data]) + allow(extractor).to receive(:extract).and_return(BulkImports::Pipeline::ExtractedData.new(data: group_data)) end parent.add_owner(user) diff --git a/spec/lib/bulk_imports/groups/pipelines/labels_pipeline_spec.rb b/spec/lib/bulk_imports/groups/pipelines/labels_pipeline_spec.rb index 3327a30f1d515f9b0605f775f77d8031e717ee78..80ad5b69a61f3a5e5ab1f00289d2fd8d229edf52 100644 --- a/spec/lib/bulk_imports/groups/pipelines/labels_pipeline_spec.rb +++ b/spec/lib/bulk_imports/groups/pipelines/labels_pipeline_spec.rb @@ -3,11 +3,12 @@ require 'spec_helper' RSpec.describe BulkImports::Groups::Pipelines::LabelsPipeline do - let(:user) { create(:user) } - let(:group) { create(:group) } - let(:cursor) { 'cursor' } - let(:timestamp) { Time.new(2020, 01, 01).utc } - let(:entity) do + let_it_be(:user) { create(:user) } + let_it_be(:group) { create(:group) } + let_it_be(:cursor) { 'cursor' } + let_it_be(:timestamp) { Time.new(2020, 01, 01).utc } + + let_it_be(:entity) do create( :bulk_import_entity, source_full_path: 'source/full/path', @@ -17,7 +18,8 @@ ) end - let(:context) { BulkImports::Pipeline::Context.new(entity) } + let_it_be(:tracker) { create(:bulk_import_tracker, entity: entity) } + let_it_be(:context) { BulkImports::Pipeline::Context.new(tracker) } subject { described_class.new(context) } @@ -72,8 +74,6 @@ def extractor_data(title:, has_next_page:, cursor: nil) subject.after_run(data) - tracker = entity.trackers.find_by(relation: :labels) - expect(tracker.has_next_page).to eq(true) expect(tracker.next_page).to eq(cursor) end @@ -87,8 +87,6 @@ def extractor_data(title:, has_next_page:, cursor: nil) subject.after_run(data) - tracker = entity.trackers.find_by(relation: :labels) - expect(tracker.has_next_page).to eq(false) expect(tracker.next_page).to be_nil end diff --git a/spec/lib/bulk_imports/groups/pipelines/members_pipeline_spec.rb b/spec/lib/bulk_imports/groups/pipelines/members_pipeline_spec.rb index 74d3e09d263cc91981dfc7a9a232107e976550f2..5c82a028751cfb3c43ebcccf2f9e0d7d91b815a6 100644 --- a/spec/lib/bulk_imports/groups/pipelines/members_pipeline_spec.rb +++ b/spec/lib/bulk_imports/groups/pipelines/members_pipeline_spec.rb @@ -11,7 +11,8 @@ let_it_be(:cursor) { 'cursor' } let_it_be(:bulk_import) { create(:bulk_import, user: user) } let_it_be(:entity) { create(:bulk_import_entity, bulk_import: bulk_import, group: group) } - let_it_be(:context) { BulkImports::Pipeline::Context.new(entity) } + let_it_be(:tracker) { create(:bulk_import_tracker, entity: entity) } + let_it_be(:context) { BulkImports::Pipeline::Context.new(tracker) } subject { described_class.new(context) } diff --git a/spec/lib/bulk_imports/groups/pipelines/milestones_pipeline_spec.rb b/spec/lib/bulk_imports/groups/pipelines/milestones_pipeline_spec.rb index f0c34c65257df6d9e43a42b90d26b023ef334e4d..15a64a70ff3ea427d64acaafdd9a92e78db92f1a 100644 --- a/spec/lib/bulk_imports/groups/pipelines/milestones_pipeline_spec.rb +++ b/spec/lib/bulk_imports/groups/pipelines/milestones_pipeline_spec.rb @@ -9,7 +9,7 @@ let_it_be(:timestamp) { Time.new(2020, 01, 01).utc } let_it_be(:bulk_import) { create(:bulk_import, user: user) } - let(:entity) do + let_it_be(:entity) do create( :bulk_import_entity, bulk_import: bulk_import, @@ -20,7 +20,8 @@ ) end - let(:context) { BulkImports::Pipeline::Context.new(entity) } + let_it_be(:tracker) { create(:bulk_import_tracker, entity: entity) } + let_it_be(:context) { BulkImports::Pipeline::Context.new(tracker) } subject { described_class.new(context) } @@ -84,8 +85,6 @@ def extracted_data(title:, has_next_page:, cursor: nil) subject.after_run(data) - tracker = entity.trackers.find_by(relation: :milestones) - expect(tracker.has_next_page).to eq(true) expect(tracker.next_page).to eq(cursor) end @@ -99,8 +98,6 @@ def extracted_data(title:, has_next_page:, cursor: nil) subject.after_run(data) - tracker = entity.trackers.find_by(relation: :milestones) - expect(tracker.has_next_page).to eq(false) expect(tracker.next_page).to be_nil end diff --git a/spec/lib/bulk_imports/groups/pipelines/subgroup_entities_pipeline_spec.rb b/spec/lib/bulk_imports/groups/pipelines/subgroup_entities_pipeline_spec.rb index 2a99646bb4ae2bc5997f846fdbeeea84cac36dd9..fd7265aea34fd3cb7e694494fe041b9283db022c 100644 --- a/spec/lib/bulk_imports/groups/pipelines/subgroup_entities_pipeline_spec.rb +++ b/spec/lib/bulk_imports/groups/pipelines/subgroup_entities_pipeline_spec.rb @@ -6,19 +6,13 @@ let_it_be(:user) { create(:user) } let_it_be(:group) { create(:group, path: 'group') } let_it_be(:parent) { create(:group, name: 'imported-group', path: 'imported-group') } - let(:context) { BulkImports::Pipeline::Context.new(parent_entity) } + let_it_be(:parent_entity) { create(:bulk_import_entity, destination_namespace: parent.full_path, group: parent) } + let_it_be(:tracker) { create(:bulk_import_tracker, entity: parent_entity) } + let_it_be(:context) { BulkImports::Pipeline::Context.new(tracker) } subject { described_class.new(context) } describe '#run' do - let!(:parent_entity) do - create( - :bulk_import_entity, - destination_namespace: parent.full_path, - group: parent - ) - end - let(:subgroup_data) do [ { diff --git a/spec/lib/bulk_imports/groups/transformers/group_attributes_transformer_spec.rb b/spec/lib/bulk_imports/groups/transformers/group_attributes_transformer_spec.rb index b3fe8a2ba25a18a7704e7694c2584f4cca52619b..75d8c15088ad82d4e0cb8105f93f4c9ce9b64d9a 100644 --- a/spec/lib/bulk_imports/groups/transformers/group_attributes_transformer_spec.rb +++ b/spec/lib/bulk_imports/groups/transformers/group_attributes_transformer_spec.rb @@ -4,11 +4,12 @@ RSpec.describe BulkImports::Groups::Transformers::GroupAttributesTransformer do describe '#transform' do - let(:user) { create(:user) } - let(:parent) { create(:group) } - let(:group) { create(:group, name: 'My Source Group', parent: parent) } - let(:bulk_import) { create(:bulk_import, user: user) } - let(:entity) do + let_it_be(:user) { create(:user) } + let_it_be(:parent) { create(:group) } + let_it_be(:group) { create(:group, name: 'My Source Group', parent: parent) } + let_it_be(:bulk_import) { create(:bulk_import, user: user) } + + let_it_be(:entity) do create( :bulk_import_entity, bulk_import: bulk_import, @@ -18,7 +19,8 @@ ) end - let(:context) { BulkImports::Pipeline::Context.new(entity) } + let_it_be(:tracker) { create(:bulk_import_tracker, entity: entity) } + let_it_be(:context) { BulkImports::Pipeline::Context.new(tracker) } let(:data) do { @@ -82,14 +84,7 @@ context 'when destination namespace is empty' do it 'does not set parent id' do - entity = create( - :bulk_import_entity, - bulk_import: bulk_import, - source_full_path: 'source/full/path', - destination_name: group.name, - destination_namespace: '' - ) - context = BulkImports::Pipeline::Context.new(entity) + entity.update!(destination_namespace: '') transformed_data = subject.transform(context, data) diff --git a/spec/lib/bulk_imports/groups/transformers/member_attributes_transformer_spec.rb b/spec/lib/bulk_imports/groups/transformers/member_attributes_transformer_spec.rb index f66c67fc6a2b8082ccd9c98acb890aad7c421da1..f3905a4b6e4a9f247423b8c52c282dde5af83003 100644 --- a/spec/lib/bulk_imports/groups/transformers/member_attributes_transformer_spec.rb +++ b/spec/lib/bulk_imports/groups/transformers/member_attributes_transformer_spec.rb @@ -8,7 +8,8 @@ let_it_be(:group) { create(:group) } let_it_be(:bulk_import) { create(:bulk_import, user: user) } let_it_be(:entity) { create(:bulk_import_entity, bulk_import: bulk_import, group: group) } - let_it_be(:context) { BulkImports::Pipeline::Context.new(entity) } + let_it_be(:tracker) { create(:bulk_import_tracker, entity: entity) } + let_it_be(:context) { BulkImports::Pipeline::Context.new(tracker) } it 'returns nil when receives no data' do expect(subject.transform(context, nil)).to eq(nil) diff --git a/spec/lib/bulk_imports/importers/group_importer_spec.rb b/spec/lib/bulk_imports/importers/group_importer_spec.rb index 5d501b49e41aad212a006d1c86428763b2bda44d..9579ee532510929453877387bf2093e27029f8b8 100644 --- a/spec/lib/bulk_imports/importers/group_importer_spec.rb +++ b/spec/lib/bulk_imports/importers/group_importer_spec.rb @@ -3,18 +3,18 @@ require 'spec_helper' RSpec.describe BulkImports::Importers::GroupImporter do - let(:user) { create(:user) } - let(:group) { create(:group) } - let(:bulk_import) { create(:bulk_import) } - let(:bulk_import_entity) { create(:bulk_import_entity, :started, bulk_import: bulk_import, group: group) } - let(:bulk_import_configuration) { create(:bulk_import_configuration, bulk_import: bulk_import) } - let(:context) { BulkImports::Pipeline::Context.new(bulk_import_entity) } + let_it_be(:user) { create(:user) } + let_it_be(:group) { create(:group) } + let_it_be(:bulk_import) { create(:bulk_import) } + let_it_be(:entity) { create(:bulk_import_entity, :started, group: group) } + let_it_be(:tracker) { create(:bulk_import_tracker, entity: entity, pipeline_name: described_class.name) } + let_it_be(:context) { BulkImports::Pipeline::Context.new(tracker) } before do allow(BulkImports::Pipeline::Context).to receive(:new).and_return(context) end - subject { described_class.new(bulk_import_entity) } + subject { described_class.new(entity) } describe '#execute' do it 'starts the entity and run its pipelines' do @@ -33,18 +33,18 @@ subject.execute - expect(bulk_import_entity.reload).to be_finished + expect(entity).to be_finished end context 'when failed' do - let(:bulk_import_entity) { create(:bulk_import_entity, :failed, bulk_import: bulk_import, group: group) } + let(:entity) { create(:bulk_import_entity, :failed, bulk_import: bulk_import, group: group) } it 'does not transition entity to finished state' do - allow(bulk_import_entity).to receive(:start!) + allow(entity).to receive(:start!) subject.execute - expect(bulk_import_entity.reload).to be_failed + expect(entity.reload).to be_failed end end end diff --git a/spec/lib/bulk_imports/pipeline/context_spec.rb b/spec/lib/bulk_imports/pipeline/context_spec.rb index c8c3fe3a861db36850755dbce664e710016c9142..5b7711ad5d7133d0e650a31a95a7b70b964c8c2c 100644 --- a/spec/lib/bulk_imports/pipeline/context_spec.rb +++ b/spec/lib/bulk_imports/pipeline/context_spec.rb @@ -3,29 +3,52 @@ require 'spec_helper' RSpec.describe BulkImports::Pipeline::Context do - let(:group) { instance_double(Group) } - let(:user) { instance_double(User) } - let(:bulk_import) { instance_double(BulkImport, user: user, configuration: :config) } - - let(:entity) do - instance_double( - BulkImports::Entity, - bulk_import: bulk_import, - group: group + let_it_be(:user) { create(:user) } + let_it_be(:group) { create(:group) } + let_it_be(:bulk_import) { create(:bulk_import, user: user) } + + let_it_be(:entity) do + create( + :bulk_import_entity, + source_full_path: 'source/full/path', + destination_name: 'My Destination Group', + destination_namespace: group.full_path, + group: group, + bulk_import: bulk_import + ) + end + + let_it_be(:tracker) do + create( + :bulk_import_tracker, + entity: entity, + pipeline_name: described_class.name ) end - subject { described_class.new(entity) } + subject { described_class.new(tracker, extra: :data) } + + describe '#entity' do + it { expect(subject.entity).to eq(entity) } + end describe '#group' do it { expect(subject.group).to eq(group) } end + describe '#bulk_import' do + it { expect(subject.bulk_import).to eq(bulk_import) } + end + describe '#current_user' do it { expect(subject.current_user).to eq(user) } end - describe '#current_user' do + describe '#configuration' do it { expect(subject.configuration).to eq(bulk_import.configuration) } end + + describe '#extra' do + it { expect(subject.extra).to eq(extra: :data) } + end end diff --git a/spec/lib/bulk_imports/pipeline/runner_spec.rb b/spec/lib/bulk_imports/pipeline/runner_spec.rb index 59f01c9caaa5dc74b7d4df78ebc100caeca67021..29fd1ee3ffcf9c341fd7d6ec455727f14a8d4c48 100644 --- a/spec/lib/bulk_imports/pipeline/runner_spec.rb +++ b/spec/lib/bulk_imports/pipeline/runner_spec.rb @@ -45,8 +45,9 @@ def after_run(_); end stub_const('BulkImports::MyPipeline', pipeline) end - let_it_be_with_refind(:entity) { create(:bulk_import_entity) } - let(:context) { BulkImports::Pipeline::Context.new(entity, extra: :data) } + let_it_be_with_reload(:entity) { create(:bulk_import_entity) } + let_it_be(:tracker) { create(:bulk_import_tracker, entity: entity) } + let_it_be(:context) { BulkImports::Pipeline::Context.new(tracker, extra: :data) } subject { BulkImports::MyPipeline.new(context) } @@ -170,12 +171,7 @@ def after_run(_); end BulkImports::MyPipeline.abort_on_failure! end - it 'marks entity as failed' do - expect { subject.run } - .to change(entity, :status_name).to(:failed) - end - - it 'logs warn message' do + it 'logs a warn message and marks entity as failed' do expect_next_instance_of(Gitlab::Import::Logger) do |logger| expect(logger).to receive(:warn) .with( @@ -188,6 +184,9 @@ def after_run(_); end end subject.run + + expect(entity.status_name).to eq(:failed) + expect(tracker.status_name).to eq(:failed) end end @@ -206,11 +205,11 @@ def after_run(_); end entity.fail_op! expect_next_instance_of(Gitlab::Import::Logger) do |logger| - expect(logger).to receive(:info) + expect(logger).to receive(:warn) .with( log_params( context, - message: 'Skipping due to failed pipeline status', + message: 'Skipping pipeline due to failed entity', pipeline_class: 'BulkImports::MyPipeline' ) ) diff --git a/spec/lib/bulk_imports/pipeline_spec.rb b/spec/lib/bulk_imports/pipeline_spec.rb index c882e3d26eaed4f6aafe19c6b4c24ab61d1a5a5b..dda2e41f06cb0e0b98e2848b56fab275a66638b3 100644 --- a/spec/lib/bulk_imports/pipeline_spec.rb +++ b/spec/lib/bulk_imports/pipeline_spec.rb @@ -3,6 +3,8 @@ require 'spec_helper' RSpec.describe BulkImports::Pipeline do + let(:context) { instance_double(BulkImports::Pipeline::Context, tracker: nil) } + before do stub_const('BulkImports::Extractor', Class.new) stub_const('BulkImports::Transformer', Class.new) @@ -44,7 +46,7 @@ def load; end end it 'returns itself when retrieving extractor & loader' do - pipeline = BulkImports::AnotherPipeline.new(nil) + pipeline = BulkImports::AnotherPipeline.new(context) expect(pipeline.send(:extractor)).to eq(pipeline) expect(pipeline.send(:loader)).to eq(pipeline) @@ -83,7 +85,7 @@ def load; end expect(BulkImports::Transformer).to receive(:new).with(foo: :bar) expect(BulkImports::Loader).to receive(:new).with(foo: :bar) - pipeline = BulkImports::MyPipeline.new(nil) + pipeline = BulkImports::MyPipeline.new(context) pipeline.send(:extractor) pipeline.send(:transformers) @@ -109,7 +111,7 @@ def load; end expect(BulkImports::Transformer).to receive(:new).with(no_args) expect(BulkImports::Loader).to receive(:new).with(no_args) - pipeline = BulkImports::NoOptionsPipeline.new(nil) + pipeline = BulkImports::NoOptionsPipeline.new(context) pipeline.send(:extractor) pipeline.send(:transformers) @@ -135,7 +137,7 @@ def transform; end transformer = double allow(BulkImports::Transformer).to receive(:new).and_return(transformer) - pipeline = BulkImports::TransformersPipeline.new(nil) + pipeline = BulkImports::TransformersPipeline.new(context) expect(pipeline.send(:transformers)).to eq([pipeline, transformer]) end diff --git a/spec/models/bulk_imports/entity_spec.rb b/spec/models/bulk_imports/entity_spec.rb index 17ab4d5954cc66bf9d8caf5e239781176b6d9115..652ea431696f2963af6af013833516b54e8cae8f 100644 --- a/spec/models/bulk_imports/entity_spec.rb +++ b/spec/models/bulk_imports/entity_spec.rb @@ -125,68 +125,4 @@ end end end - - describe "#update_tracker_for" do - let(:entity) { create(:bulk_import_entity) } - - it "inserts new tracker when it does not exist" do - expect do - entity.update_tracker_for(relation: :relation, has_next_page: false) - end.to change(BulkImports::Tracker, :count).by(1) - - tracker = entity.trackers.last - - expect(tracker.relation).to eq('relation') - expect(tracker.has_next_page).to eq(false) - expect(tracker.next_page).to eq(nil) - end - - it "updates the tracker if it already exist" do - create( - :bulk_import_tracker, - relation: :relation, - has_next_page: false, - entity: entity - ) - - expect do - entity.update_tracker_for(relation: :relation, has_next_page: true, next_page: 'nextPage') - end.not_to change(BulkImports::Tracker, :count) - - tracker = entity.trackers.last - - expect(tracker.relation).to eq('relation') - expect(tracker.has_next_page).to eq(true) - expect(tracker.next_page).to eq('nextPage') - end - end - - describe "#has_next_page?" do - it "queries for the given relation if it has more pages to be fetched" do - entity = create(:bulk_import_entity) - create( - :bulk_import_tracker, - relation: :relation, - has_next_page: false, - entity: entity - ) - - expect(entity.has_next_page?(:relation)).to eq(false) - end - end - - describe "#next_page_for" do - it "queries for the next page of the given relation" do - entity = create(:bulk_import_entity) - create( - :bulk_import_tracker, - relation: :relation, - has_next_page: false, - next_page: 'nextPage', - entity: entity - ) - - expect(entity.next_page_for(:relation)).to eq('nextPage') - end - end end