From fe9bc7b1e07d675e25a9ec28a94b472a661f3ca9 Mon Sep 17 00:00:00 2001 From: Kassio Borges Date: Fri, 12 Mar 2021 21:06:38 +0000 Subject: [PATCH] BulkImports: Track pipeline work with BulkImports::Tracker#status = Context Create a database representation, with the `BulkImports::Tracker`, for each BulkImport Pipeline (`include BulkImports::Pipeline`). This way, the pipeline progress can be tracked using the `status` column (state machine). With this change we can run each pipeline on its own background job (https://gitlab.com/gitlab-org/gitlab/-/issues/323384), which will bring the power of retrying the pipeline in case of failures, like rate limiting. Besides that, having each pipeline progress in the database will be handy to create a better UI to give an accurate sense of progress to the user in the future. - Merge request that introduced the `status` column to `BulkImports::tracker`: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/5568 = The change Before this change, `BulkImports::Tracker` was tracking only the pagination status, when required. So, a record was only created when the pipeline required pagination handling. Now, before running a pipeline a `BulkImports::Tracker` record is created, and when the pipeline is finished, failed or skipped, these status are also updated/tracked in the pipeline's `BulkImports::Tracker` record. The pipeline status is required to run them in individual background jobs because the pipelines have an order to run. For instance, we cannot import Epics before importing the Group or the Group labels. - Related to: https://gitlab.com/gitlab-org/gitlab/-/issues/324109 = Next step Create the `BulkImports::PipelineWorker` to run each pipeline on its own job. - https://gitlab.com/gitlab-org/gitlab/-/issues/323384 = References - Epic: https://gitlab.com/groups/gitlab-org/-/epics/5544 - Spike where the idea was tested: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/54970 --- app/models/bulk_imports/entity.rb | 19 ------ app/models/bulk_imports/tracker.rb | 6 ++ ...kassio-bulkimports-track-pipeline-work.yml | 5 ++ .../graphql/get_epic_award_emoji_query.rb | 3 +- .../groups/graphql/get_epic_events_query.rb | 3 +- .../groups/graphql/get_epics_query.rb | 2 +- .../groups/graphql/get_iterations_query.rb | 2 +- .../pipelines/epic_award_emoji_pipeline.rb | 6 +- .../groups/pipelines/epic_events_pipeline.rb | 6 +- .../groups/pipelines/epics_pipeline.rb | 3 +- .../groups/pipelines/iterations_pipeline.rb | 3 +- .../get_epic_award_emoji_query_spec.rb | 2 +- .../graphql/get_epic_events_query_spec.rb | 2 +- .../groups/graphql/get_epics_query_spec.rb | 2 +- .../graphql/get_iterations_query_spec.rb | 4 +- .../loaders/epic_award_emoji_loader_spec.rb | 3 +- .../epic_award_emoji_pipeline_spec.rb | 19 +++--- .../pipelines/epic_events_pipeline_spec.rb | 19 +++--- .../groups/pipelines/epics_pipeline_spec.rb | 12 ++-- .../pipelines/iterations_pipeline_spec.rb | 9 +-- .../epic_attributes_transformer_spec.rb | 3 +- .../importers/group_importer_spec.rb | 18 +++--- .../groups/graphql/get_labels_query.rb | 2 +- .../groups/graphql/get_members_query.rb | 2 +- .../groups/graphql/get_milestones_query.rb | 2 +- .../groups/pipelines/labels_pipeline.rb | 3 +- .../groups/pipelines/members_pipeline.rb | 3 +- .../groups/pipelines/milestones_pipeline.rb | 3 +- lib/bulk_imports/importers/group_importer.rb | 13 +++- lib/bulk_imports/pipeline.rb | 4 ++ lib/bulk_imports/pipeline/context.rb | 22 +++++-- lib/bulk_imports/pipeline/extracted_data.rb | 7 +- lib/bulk_imports/pipeline/runner.rb | 17 +++-- .../user_reference_transformer_spec.rb | 3 +- .../extractors/subgroups_extractor_spec.rb | 3 +- .../groups/graphql/get_group_query_spec.rb | 6 +- .../groups/graphql/get_labels_query_spec.rb | 4 +- .../groups/graphql/get_members_query_spec.rb | 4 +- .../graphql/get_milestones_query_spec.rb | 4 +- .../groups/loaders/group_loader_spec.rb | 11 ++-- .../groups/pipelines/group_pipeline_spec.rb | 14 ++-- .../groups/pipelines/labels_pipeline_spec.rb | 18 +++--- .../groups/pipelines/members_pipeline_spec.rb | 3 +- .../pipelines/milestones_pipeline_spec.rb | 9 +-- .../subgroup_entities_pipeline_spec.rb | 12 +--- .../group_attributes_transformer_spec.rb | 23 +++---- .../member_attributes_transformer_spec.rb | 3 +- .../importers/group_importer_spec.rb | 22 +++---- .../lib/bulk_imports/pipeline/context_spec.rb | 45 +++++++++---- spec/lib/bulk_imports/pipeline/runner_spec.rb | 19 +++--- spec/lib/bulk_imports/pipeline_spec.rb | 10 +-- spec/models/bulk_imports/entity_spec.rb | 64 ------------------- 52 files changed, 229 insertions(+), 277 deletions(-) create mode 100644 changelogs/unreleased/kassio-bulkimports-track-pipeline-work.yml diff --git a/app/models/bulk_imports/entity.rb b/app/models/bulk_imports/entity.rb index 9127dab56a6d35..04af11457692da 100644 --- a/app/models/bulk_imports/entity.rb +++ b/app/models/bulk_imports/entity.rb @@ -68,25 +68,6 @@ class BulkImports::Entity < ApplicationRecord end end - def update_tracker_for(relation:, has_next_page:, next_page: nil) - attributes = { - relation: relation, - has_next_page: has_next_page, - next_page: next_page, - bulk_import_entity_id: id - } - - trackers.upsert(attributes, unique_by: %i[bulk_import_entity_id relation]) - end - - def has_next_page?(relation) - trackers.find_by(relation: relation)&.has_next_page - end - - def next_page_for(relation) - trackers.find_by(relation: relation)&.next_page - end - private def validate_parent_is_a_group diff --git a/app/models/bulk_imports/tracker.rb b/app/models/bulk_imports/tracker.rb index 182c0bbaa8a24a..8d16edccd5b5cc 100644 --- a/app/models/bulk_imports/tracker.rb +++ b/app/models/bulk_imports/tracker.rb @@ -3,6 +3,8 @@ class BulkImports::Tracker < ApplicationRecord self.table_name = 'bulk_import_trackers' + alias_attribute :pipeline_name, :relation + belongs_to :entity, class_name: 'BulkImports::Entity', foreign_key: :bulk_import_entity_id, @@ -28,6 +30,10 @@ class BulkImports::Tracker < ApplicationRecord end event :finish do + # When applying the concurrent model, + # remove the created => finished transaction + # https://gitlab.com/gitlab-org/gitlab/-/issues/323384 + transition created: :finished transition started: :finished transition failed: :failed transition skipped: :skipped diff --git a/changelogs/unreleased/kassio-bulkimports-track-pipeline-work.yml b/changelogs/unreleased/kassio-bulkimports-track-pipeline-work.yml new file mode 100644 index 00000000000000..3665f0c995f8d2 --- /dev/null +++ b/changelogs/unreleased/kassio-bulkimports-track-pipeline-work.yml @@ -0,0 +1,5 @@ +--- +title: 'BulkImports: Track pipeline worker with BulkImports::Tracker#status' +merge_request: 56242 +author: +type: changed diff --git a/ee/lib/ee/bulk_imports/groups/graphql/get_epic_award_emoji_query.rb b/ee/lib/ee/bulk_imports/groups/graphql/get_epic_award_emoji_query.rb index 8aa9fb8da2f4b6..d98acf6d3cf650 100644 --- a/ee/lib/ee/bulk_imports/groups/graphql/get_epic_award_emoji_query.rb +++ b/ee/lib/ee/bulk_imports/groups/graphql/get_epic_award_emoji_query.rb @@ -32,11 +32,10 @@ def to_s def variables(context) iid = context.extra[:epic_iid] - tracker = "epic_#{iid}_award_emoji" { full_path: context.entity.source_full_path, - cursor: context.entity.next_page_for(tracker), + cursor: context.tracker.next_page, epic_iid: iid } end diff --git a/ee/lib/ee/bulk_imports/groups/graphql/get_epic_events_query.rb b/ee/lib/ee/bulk_imports/groups/graphql/get_epic_events_query.rb index 1e93d3220eadc0..f19b5f4a251778 100644 --- a/ee/lib/ee/bulk_imports/groups/graphql/get_epic_events_query.rb +++ b/ee/lib/ee/bulk_imports/groups/graphql/get_epic_events_query.rb @@ -34,11 +34,10 @@ def to_s def variables(context) iid = context.extra[:epic_iid] - tracker = "epic_#{iid}_events" { full_path: context.entity.source_full_path, - cursor: context.entity.next_page_for(tracker), + cursor: context.tracker.next_page, epic_iid: iid } end diff --git a/ee/lib/ee/bulk_imports/groups/graphql/get_epics_query.rb b/ee/lib/ee/bulk_imports/groups/graphql/get_epics_query.rb index 9f8099ae471440..f8c5707b5df30c 100644 --- a/ee/lib/ee/bulk_imports/groups/graphql/get_epics_query.rb +++ b/ee/lib/ee/bulk_imports/groups/graphql/get_epics_query.rb @@ -61,7 +61,7 @@ def to_s def variables(context) { full_path: context.entity.source_full_path, - cursor: context.entity.next_page_for(:epics) + cursor: context.tracker.next_page } end diff --git a/ee/lib/ee/bulk_imports/groups/graphql/get_iterations_query.rb b/ee/lib/ee/bulk_imports/groups/graphql/get_iterations_query.rb index 4392f044ade710..d49449e15ffba5 100644 --- a/ee/lib/ee/bulk_imports/groups/graphql/get_iterations_query.rb +++ b/ee/lib/ee/bulk_imports/groups/graphql/get_iterations_query.rb @@ -35,7 +35,7 @@ def to_s def variables(context) { full_path: context.entity.source_full_path, - cursor: context.entity.next_page_for(:iterations) + cursor: context.tracker.next_page } end diff --git a/ee/lib/ee/bulk_imports/groups/pipelines/epic_award_emoji_pipeline.rb b/ee/lib/ee/bulk_imports/groups/pipelines/epic_award_emoji_pipeline.rb index efc4e8f2e3c235..1300ae872ccbdb 100644 --- a/ee/lib/ee/bulk_imports/groups/pipelines/epic_award_emoji_pipeline.rb +++ b/ee/lib/ee/bulk_imports/groups/pipelines/epic_award_emoji_pipeline.rb @@ -25,11 +25,7 @@ def initialize(context) end def after_run(extracted_data) - iid = context.extra[:epic_iid] - tracker = "epic_#{iid}_award_emoji" - - context.entity.update_tracker_for( - relation: tracker, + tracker.update( has_next_page: extracted_data.has_next_page?, next_page: extracted_data.next_page ) diff --git a/ee/lib/ee/bulk_imports/groups/pipelines/epic_events_pipeline.rb b/ee/lib/ee/bulk_imports/groups/pipelines/epic_events_pipeline.rb index e0d558c78cd616..5506cbbab8f135 100644 --- a/ee/lib/ee/bulk_imports/groups/pipelines/epic_events_pipeline.rb +++ b/ee/lib/ee/bulk_imports/groups/pipelines/epic_events_pipeline.rb @@ -48,11 +48,7 @@ def load(context, data) end def after_run(extracted_data) - iid = context.extra[:epic_iid] - tracker = "epic_#{iid}_events" - - context.entity.update_tracker_for( - relation: tracker, + tracker.update( has_next_page: extracted_data.has_next_page?, next_page: extracted_data.next_page ) diff --git a/ee/lib/ee/bulk_imports/groups/pipelines/epics_pipeline.rb b/ee/lib/ee/bulk_imports/groups/pipelines/epics_pipeline.rb index 61ab3b035bff13..0e12248fd40f3d 100644 --- a/ee/lib/ee/bulk_imports/groups/pipelines/epics_pipeline.rb +++ b/ee/lib/ee/bulk_imports/groups/pipelines/epics_pipeline.rb @@ -24,8 +24,7 @@ def load(context, data) end def after_run(extracted_data) - context.entity.update_tracker_for( - relation: :epics, + tracker.update( has_next_page: extracted_data.has_next_page?, next_page: extracted_data.next_page ) diff --git a/ee/lib/ee/bulk_imports/groups/pipelines/iterations_pipeline.rb b/ee/lib/ee/bulk_imports/groups/pipelines/iterations_pipeline.rb index 60e6aac2f2b451..39f4e8f90e68e4 100644 --- a/ee/lib/ee/bulk_imports/groups/pipelines/iterations_pipeline.rb +++ b/ee/lib/ee/bulk_imports/groups/pipelines/iterations_pipeline.rb @@ -21,8 +21,7 @@ def load(context, data) end def after_run(extracted_data) - context.entity.update_tracker_for( - relation: :iterations, + tracker.update( has_next_page: extracted_data.has_next_page?, next_page: extracted_data.next_page ) diff --git a/ee/spec/lib/ee/bulk_imports/groups/graphql/get_epic_award_emoji_query_spec.rb b/ee/spec/lib/ee/bulk_imports/groups/graphql/get_epic_award_emoji_query_spec.rb index 5b2be39b9c77e3..cdb5dcaab30092 100644 --- a/ee/spec/lib/ee/bulk_imports/groups/graphql/get_epic_award_emoji_query_spec.rb +++ b/ee/spec/lib/ee/bulk_imports/groups/graphql/get_epic_award_emoji_query_spec.rb @@ -4,7 +4,7 @@ RSpec.describe EE::BulkImports::Groups::Graphql::GetEpicAwardEmojiQuery do it 'has a valid query' do - context = BulkImports::Pipeline::Context.new(create(:bulk_import_entity), epic_iid: 1) + context = BulkImports::Pipeline::Context.new(create(:bulk_import_tracker), epic_iid: 1) result = GitlabSchema.execute( described_class.to_s, diff --git a/ee/spec/lib/ee/bulk_imports/groups/graphql/get_epic_events_query_spec.rb b/ee/spec/lib/ee/bulk_imports/groups/graphql/get_epic_events_query_spec.rb index 91359244f1d29a..42fab71c233966 100644 --- a/ee/spec/lib/ee/bulk_imports/groups/graphql/get_epic_events_query_spec.rb +++ b/ee/spec/lib/ee/bulk_imports/groups/graphql/get_epic_events_query_spec.rb @@ -4,7 +4,7 @@ RSpec.describe EE::BulkImports::Groups::Graphql::GetEpicEventsQuery do it 'has a valid query' do - context = BulkImports::Pipeline::Context.new(create(:bulk_import_entity), epic_iid: 1) + context = BulkImports::Pipeline::Context.new(create(:bulk_import_tracker), epic_iid: 1) result = GitlabSchema.execute( described_class.to_s, diff --git a/ee/spec/lib/ee/bulk_imports/groups/graphql/get_epics_query_spec.rb b/ee/spec/lib/ee/bulk_imports/groups/graphql/get_epics_query_spec.rb index a666087cba2bda..ac82951bcad381 100644 --- a/ee/spec/lib/ee/bulk_imports/groups/graphql/get_epics_query_spec.rb +++ b/ee/spec/lib/ee/bulk_imports/groups/graphql/get_epics_query_spec.rb @@ -4,7 +4,7 @@ RSpec.describe EE::BulkImports::Groups::Graphql::GetEpicsQuery do it 'has a valid query' do - context = BulkImports::Pipeline::Context.new(create(:bulk_import_entity)) + context = BulkImports::Pipeline::Context.new(create(:bulk_import_tracker)) result = GitlabSchema.execute( described_class.to_s, diff --git a/ee/spec/lib/ee/bulk_imports/groups/graphql/get_iterations_query_spec.rb b/ee/spec/lib/ee/bulk_imports/groups/graphql/get_iterations_query_spec.rb index b0c2fb1f166db1..203922e42c2271 100644 --- a/ee/spec/lib/ee/bulk_imports/groups/graphql/get_iterations_query_spec.rb +++ b/ee/spec/lib/ee/bulk_imports/groups/graphql/get_iterations_query_spec.rb @@ -4,8 +4,8 @@ RSpec.describe EE::BulkImports::Groups::Graphql::GetIterationsQuery do it 'has a valid query' do - entity = create(:bulk_import_entity) - context = BulkImports::Pipeline::Context.new(entity) + tracker = create(:bulk_import_tracker) + context = BulkImports::Pipeline::Context.new(tracker) query = GraphQL::Query.new( GitlabSchema, diff --git a/ee/spec/lib/ee/bulk_imports/groups/loaders/epic_award_emoji_loader_spec.rb b/ee/spec/lib/ee/bulk_imports/groups/loaders/epic_award_emoji_loader_spec.rb index 6610628197cb29..382a65a5f799ea 100644 --- a/ee/spec/lib/ee/bulk_imports/groups/loaders/epic_award_emoji_loader_spec.rb +++ b/ee/spec/lib/ee/bulk_imports/groups/loaders/epic_award_emoji_loader_spec.rb @@ -9,7 +9,8 @@ let_it_be(:epic) { create(:epic, group: group, iid: 1) } let_it_be(:bulk_import) { create(:bulk_import, user: user) } let_it_be(:entity) { create(:bulk_import_entity, bulk_import: bulk_import, group: group) } - let_it_be(:context) { BulkImports::Pipeline::Context.new(entity) } + let_it_be(:tracker) { create(:bulk_import_tracker, entity: entity) } + let_it_be(:context) { BulkImports::Pipeline::Context.new(tracker) } let_it_be(:data) do { diff --git a/ee/spec/lib/ee/bulk_imports/groups/pipelines/epic_award_emoji_pipeline_spec.rb b/ee/spec/lib/ee/bulk_imports/groups/pipelines/epic_award_emoji_pipeline_spec.rb index 6f49319d902422..4351e22d80f1f4 100644 --- a/ee/spec/lib/ee/bulk_imports/groups/pipelines/epic_award_emoji_pipeline_spec.rb +++ b/ee/spec/lib/ee/bulk_imports/groups/pipelines/epic_award_emoji_pipeline_spec.rb @@ -7,8 +7,8 @@ let_it_be(:user) { create(:user) } let_it_be(:group) { create(:group) } let_it_be(:epic) { create(:epic, group: group) } - let_it_be(:tracker) { "epic_#{epic.iid}_award_emoji" } let_it_be(:bulk_import) { create(:bulk_import, user: user) } + let_it_be(:entity) do create( :bulk_import_entity, @@ -20,7 +20,8 @@ ) end - let_it_be(:context) { BulkImports::Pipeline::Context.new(entity) } + let_it_be(:tracker) { create(:bulk_import_tracker, entity: entity) } + let_it_be(:context) { BulkImports::Pipeline::Context.new(tracker) } before do stub_licensed_features(epics: true) @@ -39,7 +40,7 @@ describe '#run' do it 'imports epic award emoji' do - data = extractor_data(has_next_page: false, cursor: cursor) + data = extractor_data(has_next_page: false) allow_next_instance_of(BulkImports::Common::Extractors::GraphqlExtractor) do |extractor| allow(extractor) @@ -61,10 +62,8 @@ subject.after_run(data) - page_tracker = entity.trackers.find_by(relation: tracker) - - expect(page_tracker.has_next_page).to eq(true) - expect(page_tracker.next_page).to eq(cursor) + expect(tracker.has_next_page).to eq(true) + expect(tracker.next_page).to eq(cursor) end end @@ -76,10 +75,8 @@ subject.after_run(data) - page_tracker = entity.trackers.find_by(relation: tracker) - - expect(page_tracker.has_next_page).to eq(false) - expect(page_tracker.next_page).to be_nil + expect(tracker.has_next_page).to eq(false) + expect(tracker.next_page).to be_nil end it 'updates context with next epic iid' do diff --git a/ee/spec/lib/ee/bulk_imports/groups/pipelines/epic_events_pipeline_spec.rb b/ee/spec/lib/ee/bulk_imports/groups/pipelines/epic_events_pipeline_spec.rb index ecda06523c9880..e4f609043943f5 100644 --- a/ee/spec/lib/ee/bulk_imports/groups/pipelines/epic_events_pipeline_spec.rb +++ b/ee/spec/lib/ee/bulk_imports/groups/pipelines/epic_events_pipeline_spec.rb @@ -7,8 +7,8 @@ let_it_be(:user) { create(:user) } let_it_be(:group) { create(:group) } let_it_be(:epic) { create(:epic, group: group) } - let_it_be(:tracker) { "epic_#{epic.iid}_events" } let_it_be(:bulk_import) { create(:bulk_import, user: user) } + let_it_be(:entity) do create( :bulk_import_entity, @@ -20,7 +20,8 @@ ) end - let_it_be(:context) { BulkImports::Pipeline::Context.new(entity) } + let_it_be(:tracker) { create(:bulk_import_tracker, entity: entity) } + let_it_be(:context) { BulkImports::Pipeline::Context.new(tracker) } before do stub_licensed_features(epics: true) @@ -39,7 +40,7 @@ describe '#run' do it 'imports epic events and resource state events' do - data = extractor_data(has_next_page: false, cursor: cursor) + data = extractor_data(has_next_page: false) allow_next_instance_of(BulkImports::Common::Extractors::GraphqlExtractor) do |extractor| allow(extractor) @@ -102,10 +103,8 @@ subject.after_run(data) - page_tracker = entity.trackers.find_by(relation: tracker) - - expect(page_tracker.has_next_page).to eq(true) - expect(page_tracker.next_page).to eq(cursor) + expect(tracker.has_next_page).to eq(true) + expect(tracker.next_page).to eq(cursor) end end @@ -117,10 +116,8 @@ subject.after_run(data) - page_tracker = entity.trackers.find_by(relation: tracker) - - expect(page_tracker.has_next_page).to eq(false) - expect(page_tracker.next_page).to be_nil + expect(tracker.has_next_page).to eq(false) + expect(tracker.next_page).to be_nil end it 'updates context with next epic iid' do diff --git a/ee/spec/lib/ee/bulk_imports/groups/pipelines/epics_pipeline_spec.rb b/ee/spec/lib/ee/bulk_imports/groups/pipelines/epics_pipeline_spec.rb index 514f4a2607dc9a..8e9938d83b025a 100644 --- a/ee/spec/lib/ee/bulk_imports/groups/pipelines/epics_pipeline_spec.rb +++ b/ee/spec/lib/ee/bulk_imports/groups/pipelines/epics_pipeline_spec.rb @@ -6,8 +6,9 @@ let_it_be(:cursor) { 'cursor' } let_it_be(:user) { create(:user) } let_it_be(:group) { create(:group) } - let(:bulk_import) { create(:bulk_import, user: user) } - let(:entity) do + let_it_be(:bulk_import) { create(:bulk_import, user: user) } + + let_it_be(:entity) do create( :bulk_import_entity, group: group, @@ -18,7 +19,8 @@ ) end - let(:context) { BulkImports::Pipeline::Context.new(entity) } + let_it_be(:tracker) { create(:bulk_import_tracker, entity: entity) } + let_it_be(:context) { BulkImports::Pipeline::Context.new(tracker) } before do stub_licensed_features(epics: true) @@ -116,8 +118,6 @@ subject.after_run(data) - tracker = entity.trackers.find_by(relation: :epics) - expect(tracker.has_next_page).to eq(true) expect(tracker.next_page).to eq(cursor) end @@ -131,8 +131,6 @@ subject.after_run(data) - tracker = entity.trackers.find_by(relation: :epics) - expect(tracker.has_next_page).to eq(false) expect(tracker.next_page).to be_nil end diff --git a/ee/spec/lib/ee/bulk_imports/groups/pipelines/iterations_pipeline_spec.rb b/ee/spec/lib/ee/bulk_imports/groups/pipelines/iterations_pipeline_spec.rb index d2163ce86fbab2..b6887e224ffbf0 100644 --- a/ee/spec/lib/ee/bulk_imports/groups/pipelines/iterations_pipeline_spec.rb +++ b/ee/spec/lib/ee/bulk_imports/groups/pipelines/iterations_pipeline_spec.rb @@ -9,7 +9,7 @@ let_it_be(:timestamp) { Time.new(2020, 01, 01).utc } let_it_be(:bulk_import) { create(:bulk_import, user: user) } - let(:entity) do + let_it_be(:entity) do create( :bulk_import_entity, bulk_import: bulk_import, @@ -20,7 +20,8 @@ ) end - let(:context) { BulkImports::Pipeline::Context.new(entity) } + let_it_be(:tracker) { create(:bulk_import_tracker, entity: entity) } + let_it_be(:context) { BulkImports::Pipeline::Context.new(tracker) } subject { described_class.new(context) } @@ -85,8 +86,6 @@ def extracted_data(title:, has_next_page:, cursor: nil, start_date: Date.today) subject.after_run(data) - tracker = entity.trackers.find_by(relation: :iterations) - expect(tracker.has_next_page).to eq(true) expect(tracker.next_page).to eq(cursor) end @@ -100,8 +99,6 @@ def extracted_data(title:, has_next_page:, cursor: nil, start_date: Date.today) subject.after_run(data) - tracker = entity.trackers.find_by(relation: :iterations) - expect(tracker.has_next_page).to eq(false) expect(tracker.next_page).to be_nil end diff --git a/ee/spec/lib/ee/bulk_imports/groups/transformers/epic_attributes_transformer_spec.rb b/ee/spec/lib/ee/bulk_imports/groups/transformers/epic_attributes_transformer_spec.rb index 0a28d21189b432..997df43fff4f9e 100644 --- a/ee/spec/lib/ee/bulk_imports/groups/transformers/epic_attributes_transformer_spec.rb +++ b/ee/spec/lib/ee/bulk_imports/groups/transformers/epic_attributes_transformer_spec.rb @@ -7,7 +7,8 @@ let_it_be(:group) { create(:group) } let_it_be(:bulk_import) { create(:bulk_import, user: importer_user) } let_it_be(:entity) { create(:bulk_import_entity, bulk_import: bulk_import, group: group) } - let_it_be(:context) { ::BulkImports::Pipeline::Context.new(entity) } + let_it_be(:tracker) { create(:bulk_import_tracker, entity: entity) } + let_it_be(:context) { BulkImports::Pipeline::Context.new(tracker) } describe '#transform' do it 'transforms the epic attributes' do diff --git a/ee/spec/lib/ee/bulk_imports/importers/group_importer_spec.rb b/ee/spec/lib/ee/bulk_imports/importers/group_importer_spec.rb index 7b93a8ea1f2396..68aedb7830adce 100644 --- a/ee/spec/lib/ee/bulk_imports/importers/group_importer_spec.rb +++ b/ee/spec/lib/ee/bulk_imports/importers/group_importer_spec.rb @@ -3,14 +3,14 @@ require 'spec_helper' RSpec.describe BulkImports::Importers::GroupImporter do - let(:user) { create(:user) } - let(:group) { create(:group) } - let(:bulk_import) { create(:bulk_import, user: user) } - let(:bulk_import_entity) { create(:bulk_import_entity, :started, bulk_import: bulk_import, group: group) } - let(:bulk_import_configuration) { create(:bulk_import_configuration, bulk_import: bulk_import) } - let(:context) { BulkImports::Pipeline::Context.new(bulk_import_entity) } + let_it_be(:user) { create(:user) } + let_it_be(:group) { create(:group) } + let_it_be(:bulk_import) { create(:bulk_import, user: user) } + let_it_be(:entity) { create(:bulk_import_entity, :started, bulk_import: bulk_import, group: group) } + let_it_be(:tracker) { create(:bulk_import_tracker, entity: entity) } + let_it_be(:context) { BulkImports::Pipeline::Context.new(tracker) } - subject { described_class.new(bulk_import_entity) } + subject { described_class.new(entity) } before do allow(BulkImports::Pipeline::Context).to receive(:new).and_return(context) @@ -25,10 +25,12 @@ expect_to_run_pipeline BulkImports::Groups::Pipelines::MilestonesPipeline, context: context expect_to_run_pipeline EE::BulkImports::Groups::Pipelines::EpicsPipeline, context: context expect_to_run_pipeline EE::BulkImports::Groups::Pipelines::EpicAwardEmojiPipeline, context: context + expect_to_run_pipeline EE::BulkImports::Groups::Pipelines::EpicEventsPipeline, context: context + expect_to_run_pipeline EE::BulkImports::Groups::Pipelines::IterationsPipeline, context: context subject.execute - expect(bulk_import_entity.reload).to be_finished + expect(entity.reload).to be_finished end end diff --git a/lib/bulk_imports/groups/graphql/get_labels_query.rb b/lib/bulk_imports/groups/graphql/get_labels_query.rb index 23efbc33581019..2d246a217a7bf4 100644 --- a/lib/bulk_imports/groups/graphql/get_labels_query.rb +++ b/lib/bulk_imports/groups/graphql/get_labels_query.rb @@ -31,7 +31,7 @@ def to_s def variables(context) { full_path: context.entity.source_full_path, - cursor: context.entity.next_page_for(:labels) + cursor: context.tracker.next_page } end diff --git a/lib/bulk_imports/groups/graphql/get_members_query.rb b/lib/bulk_imports/groups/graphql/get_members_query.rb index e3a78124a47cb1..f3a0d77acc2f94 100644 --- a/lib/bulk_imports/groups/graphql/get_members_query.rb +++ b/lib/bulk_imports/groups/graphql/get_members_query.rb @@ -34,7 +34,7 @@ def to_s def variables(context) { full_path: context.entity.source_full_path, - cursor: context.entity.next_page_for(:group_members) + cursor: context.tracker.next_page } end diff --git a/lib/bulk_imports/groups/graphql/get_milestones_query.rb b/lib/bulk_imports/groups/graphql/get_milestones_query.rb index 2ade87e6fa091d..c254b35054a212 100644 --- a/lib/bulk_imports/groups/graphql/get_milestones_query.rb +++ b/lib/bulk_imports/groups/graphql/get_milestones_query.rb @@ -33,7 +33,7 @@ def to_s def variables(context) { full_path: context.entity.source_full_path, - cursor: context.entity.next_page_for(:milestones) + cursor: context.tracker.next_page } end diff --git a/lib/bulk_imports/groups/pipelines/labels_pipeline.rb b/lib/bulk_imports/groups/pipelines/labels_pipeline.rb index 9f8b86827510ac..61d3e6c700e949 100644 --- a/lib/bulk_imports/groups/pipelines/labels_pipeline.rb +++ b/lib/bulk_imports/groups/pipelines/labels_pipeline.rb @@ -16,8 +16,7 @@ def load(context, data) end def after_run(extracted_data) - context.entity.update_tracker_for( - relation: :labels, + tracker.update( has_next_page: extracted_data.has_next_page?, next_page: extracted_data.next_page ) diff --git a/lib/bulk_imports/groups/pipelines/members_pipeline.rb b/lib/bulk_imports/groups/pipelines/members_pipeline.rb index 32fc931e8c33a2..d29bd74c5ae01e 100644 --- a/lib/bulk_imports/groups/pipelines/members_pipeline.rb +++ b/lib/bulk_imports/groups/pipelines/members_pipeline.rb @@ -19,8 +19,7 @@ def load(context, data) end def after_run(extracted_data) - context.entity.update_tracker_for( - relation: :group_members, + tracker.update( has_next_page: extracted_data.has_next_page?, next_page: extracted_data.next_page ) diff --git a/lib/bulk_imports/groups/pipelines/milestones_pipeline.rb b/lib/bulk_imports/groups/pipelines/milestones_pipeline.rb index 8497162e0e73c7..eb51424c14a4ca 100644 --- a/lib/bulk_imports/groups/pipelines/milestones_pipeline.rb +++ b/lib/bulk_imports/groups/pipelines/milestones_pipeline.rb @@ -20,8 +20,7 @@ def load(context, data) end def after_run(extracted_data) - context.entity.update_tracker_for( - relation: :milestones, + tracker.update( has_next_page: extracted_data.has_next_page?, next_page: extracted_data.next_page ) diff --git a/lib/bulk_imports/importers/group_importer.rb b/lib/bulk_imports/importers/group_importer.rb index f016b552fd45e5..ccc61ee787daf5 100644 --- a/lib/bulk_imports/importers/group_importer.rb +++ b/lib/bulk_imports/importers/group_importer.rb @@ -8,9 +8,18 @@ def initialize(entity) end def execute - context = BulkImports::Pipeline::Context.new(entity) + pipelines.each.with_index do |pipeline, stage| + pipeline_tracker = entity.trackers.create!( + pipeline_name: pipeline, + stage: stage + ) - pipelines.each { |pipeline| pipeline.new(context).run } + context = BulkImports::Pipeline::Context.new(pipeline_tracker) + + pipeline.new(context).run + + pipeline_tracker.finish! + end entity.finish! end diff --git a/lib/bulk_imports/pipeline.rb b/lib/bulk_imports/pipeline.rb index 14445162737986..df4f020d6b2f11 100644 --- a/lib/bulk_imports/pipeline.rb +++ b/lib/bulk_imports/pipeline.rb @@ -15,6 +15,10 @@ def initialize(context) @context = context end + def tracker + @tracker ||= context.tracker + end + included do private diff --git a/lib/bulk_imports/pipeline/context.rb b/lib/bulk_imports/pipeline/context.rb index dd121b2dbedcca..3c69c729f36fbf 100644 --- a/lib/bulk_imports/pipeline/context.rb +++ b/lib/bulk_imports/pipeline/context.rb @@ -3,25 +3,33 @@ module BulkImports module Pipeline class Context - attr_reader :entity, :bulk_import attr_accessor :extra - def initialize(entity, extra = {}) - @entity = entity - @bulk_import = entity.bulk_import + attr_reader :tracker + + def initialize(tracker, extra = {}) + @tracker = tracker @extra = extra end + def entity + @entity ||= tracker.entity + end + def group - entity.group + @group ||= entity.group + end + + def bulk_import + @bulk_import ||= entity.bulk_import end def current_user - bulk_import.user + @current_user ||= bulk_import.user end def configuration - bulk_import.configuration + @configuration ||= bulk_import.configuration end end end diff --git a/lib/bulk_imports/pipeline/extracted_data.rb b/lib/bulk_imports/pipeline/extracted_data.rb index 685a91a4afeb75..15b5caa1a47894 100644 --- a/lib/bulk_imports/pipeline/extracted_data.rb +++ b/lib/bulk_imports/pipeline/extracted_data.rb @@ -11,11 +11,14 @@ def initialize(data: nil, page_info: {}) end def has_next_page? - @page_info['has_next_page'] + Gitlab::Utils.to_boolean( + @page_info&.dig('has_next_page'), + default: false + ) end def next_page - @page_info['end_cursor'] + @page_info&.dig('end_cursor') end def each(&block) diff --git a/lib/bulk_imports/pipeline/runner.rb b/lib/bulk_imports/pipeline/runner.rb index e3535e585cc32b..588d2c87209472 100644 --- a/lib/bulk_imports/pipeline/runner.rb +++ b/lib/bulk_imports/pipeline/runner.rb @@ -26,7 +26,7 @@ def run end end - if respond_to?(:after_run) + if extracted_data && respond_to?(:after_run) run_pipeline_step(:after_run) do after_run(extracted_data) end @@ -34,7 +34,7 @@ def run info(message: 'Pipeline finished') rescue MarkedAsFailedError - log_skip + skip!('Skipping pipeline due to failed entity') end private # rubocop:disable Lint/UselessAccessModifier @@ -46,7 +46,11 @@ def run_pipeline_step(step, class_name = nil) yield rescue MarkedAsFailedError - log_skip(step => class_name) + skip!( + 'Skipping pipeline due to failed entity', + pipeline_step: step, + step_class: class_name + ) rescue => e log_import_failure(e, step) @@ -65,10 +69,13 @@ def mark_as_failed warn(message: 'Pipeline failed') context.entity.fail_op! + tracker.fail_op! end - def log_skip(extra = {}) - info({ message: 'Skipping due to failed pipeline status' }.merge(extra)) + def skip!(message, extra = {}) + warn({ message: message }.merge(extra)) + + tracker.skip! end def log_import_failure(exception, step) diff --git a/spec/lib/bulk_imports/common/transformers/user_reference_transformer_spec.rb b/spec/lib/bulk_imports/common/transformers/user_reference_transformer_spec.rb index ff11a10bfe9a96..e86a584d38a601 100644 --- a/spec/lib/bulk_imports/common/transformers/user_reference_transformer_spec.rb +++ b/spec/lib/bulk_imports/common/transformers/user_reference_transformer_spec.rb @@ -8,7 +8,8 @@ let_it_be(:group) { create(:group) } let_it_be(:bulk_import) { create(:bulk_import) } let_it_be(:entity) { create(:bulk_import_entity, bulk_import: bulk_import, group: group) } - let_it_be(:context) { BulkImports::Pipeline::Context.new(entity) } + let_it_be(:tracker) { create(:bulk_import_tracker, entity: entity) } + let_it_be(:context) { BulkImports::Pipeline::Context.new(tracker) } let(:hash) do { diff --git a/spec/lib/bulk_imports/groups/extractors/subgroups_extractor_spec.rb b/spec/lib/bulk_imports/groups/extractors/subgroups_extractor_spec.rb index 627247c04abb8b..ac8786440e9e01 100644 --- a/spec/lib/bulk_imports/groups/extractors/subgroups_extractor_spec.rb +++ b/spec/lib/bulk_imports/groups/extractors/subgroups_extractor_spec.rb @@ -8,8 +8,9 @@ bulk_import = create(:bulk_import) create(:bulk_import_configuration, bulk_import: bulk_import) entity = create(:bulk_import_entity, bulk_import: bulk_import) + tracker = create(:bulk_import_tracker, entity: entity) response = [{ 'test' => 'group' }] - context = BulkImports::Pipeline::Context.new(entity) + context = BulkImports::Pipeline::Context.new(tracker) allow_next_instance_of(BulkImports::Clients::Http) do |client| allow(client).to receive(:each_page).and_return(response) diff --git a/spec/lib/bulk_imports/groups/graphql/get_group_query_spec.rb b/spec/lib/bulk_imports/groups/graphql/get_group_query_spec.rb index ef46da7062b746..b0f8f74783b923 100644 --- a/spec/lib/bulk_imports/groups/graphql/get_group_query_spec.rb +++ b/spec/lib/bulk_imports/groups/graphql/get_group_query_spec.rb @@ -4,10 +4,10 @@ RSpec.describe BulkImports::Groups::Graphql::GetGroupQuery do describe '#variables' do - let(:entity) { double(source_full_path: 'test', bulk_import: nil) } - let(:context) { BulkImports::Pipeline::Context.new(entity) } - it 'returns query variables based on entity information' do + entity = double(source_full_path: 'test', bulk_import: nil) + tracker = double(entity: entity) + context = BulkImports::Pipeline::Context.new(tracker) expected = { full_path: entity.source_full_path } expect(described_class.variables(context)).to eq(expected) diff --git a/spec/lib/bulk_imports/groups/graphql/get_labels_query_spec.rb b/spec/lib/bulk_imports/groups/graphql/get_labels_query_spec.rb index 85f82be7d18ef1..61db644a372192 100644 --- a/spec/lib/bulk_imports/groups/graphql/get_labels_query_spec.rb +++ b/spec/lib/bulk_imports/groups/graphql/get_labels_query_spec.rb @@ -4,8 +4,8 @@ RSpec.describe BulkImports::Groups::Graphql::GetLabelsQuery do it 'has a valid query' do - entity = create(:bulk_import_entity) - context = BulkImports::Pipeline::Context.new(entity) + tracker = create(:bulk_import_tracker) + context = BulkImports::Pipeline::Context.new(tracker) query = GraphQL::Query.new( GitlabSchema, diff --git a/spec/lib/bulk_imports/groups/graphql/get_members_query_spec.rb b/spec/lib/bulk_imports/groups/graphql/get_members_query_spec.rb index 5d05f5a2d30628..d0c4bb817b2c16 100644 --- a/spec/lib/bulk_imports/groups/graphql/get_members_query_spec.rb +++ b/spec/lib/bulk_imports/groups/graphql/get_members_query_spec.rb @@ -4,8 +4,8 @@ RSpec.describe BulkImports::Groups::Graphql::GetMembersQuery do it 'has a valid query' do - entity = create(:bulk_import_entity) - context = BulkImports::Pipeline::Context.new(entity) + tracker = create(:bulk_import_tracker) + context = BulkImports::Pipeline::Context.new(tracker) query = GraphQL::Query.new( GitlabSchema, diff --git a/spec/lib/bulk_imports/groups/graphql/get_milestones_query_spec.rb b/spec/lib/bulk_imports/groups/graphql/get_milestones_query_spec.rb index a38505fbf85f39..7a0f964c5f3537 100644 --- a/spec/lib/bulk_imports/groups/graphql/get_milestones_query_spec.rb +++ b/spec/lib/bulk_imports/groups/graphql/get_milestones_query_spec.rb @@ -4,8 +4,8 @@ RSpec.describe BulkImports::Groups::Graphql::GetMilestonesQuery do it 'has a valid query' do - entity = create(:bulk_import_entity) - context = BulkImports::Pipeline::Context.new(entity) + tracker = create(:bulk_import_tracker) + context = BulkImports::Pipeline::Context.new(tracker) query = GraphQL::Query.new( GitlabSchema, diff --git a/spec/lib/bulk_imports/groups/loaders/group_loader_spec.rb b/spec/lib/bulk_imports/groups/loaders/group_loader_spec.rb index 183292722d2d08..533955b057cf8c 100644 --- a/spec/lib/bulk_imports/groups/loaders/group_loader_spec.rb +++ b/spec/lib/bulk_imports/groups/loaders/group_loader_spec.rb @@ -4,12 +4,13 @@ RSpec.describe BulkImports::Groups::Loaders::GroupLoader do describe '#load' do - let(:user) { create(:user) } - let(:data) { { foo: :bar } } + let_it_be(:user) { create(:user) } + let_it_be(:bulk_import) { create(:bulk_import, user: user) } + let_it_be(:entity) { create(:bulk_import_entity, bulk_import: bulk_import) } + let_it_be(:tracker) { create(:bulk_import_tracker, entity: entity) } + let_it_be(:context) { BulkImports::Pipeline::Context.new(tracker) } let(:service_double) { instance_double(::Groups::CreateService) } - let(:bulk_import) { create(:bulk_import, user: user) } - let(:entity) { create(:bulk_import_entity, bulk_import: bulk_import) } - let(:context) { BulkImports::Pipeline::Context.new(entity) } + let(:data) { { foo: :bar } } subject { described_class.new } diff --git a/spec/lib/bulk_imports/groups/pipelines/group_pipeline_spec.rb b/spec/lib/bulk_imports/groups/pipelines/group_pipeline_spec.rb index 61950cdd9b016e..39e782dc093e2a 100644 --- a/spec/lib/bulk_imports/groups/pipelines/group_pipeline_spec.rb +++ b/spec/lib/bulk_imports/groups/pipelines/group_pipeline_spec.rb @@ -4,10 +4,11 @@ RSpec.describe BulkImports::Groups::Pipelines::GroupPipeline do describe '#run' do - let(:user) { create(:user) } - let(:parent) { create(:group) } - let(:bulk_import) { create(:bulk_import, user: user) } - let(:entity) do + let_it_be(:user) { create(:user) } + let_it_be(:parent) { create(:group) } + let_it_be(:bulk_import) { create(:bulk_import, user: user) } + + let_it_be(:entity) do create( :bulk_import_entity, bulk_import: bulk_import, @@ -17,7 +18,8 @@ ) end - let(:context) { BulkImports::Pipeline::Context.new(entity) } + let_it_be(:tracker) { create(:bulk_import_tracker, entity: entity) } + let_it_be(:context) { BulkImports::Pipeline::Context.new(tracker) } let(:group_data) do { @@ -37,7 +39,7 @@ before do allow_next_instance_of(BulkImports::Common::Extractors::GraphqlExtractor) do |extractor| - allow(extractor).to receive(:extract).and_return([group_data]) + allow(extractor).to receive(:extract).and_return(BulkImports::Pipeline::ExtractedData.new(data: group_data)) end parent.add_owner(user) diff --git a/spec/lib/bulk_imports/groups/pipelines/labels_pipeline_spec.rb b/spec/lib/bulk_imports/groups/pipelines/labels_pipeline_spec.rb index 3327a30f1d515f..80ad5b69a61f3a 100644 --- a/spec/lib/bulk_imports/groups/pipelines/labels_pipeline_spec.rb +++ b/spec/lib/bulk_imports/groups/pipelines/labels_pipeline_spec.rb @@ -3,11 +3,12 @@ require 'spec_helper' RSpec.describe BulkImports::Groups::Pipelines::LabelsPipeline do - let(:user) { create(:user) } - let(:group) { create(:group) } - let(:cursor) { 'cursor' } - let(:timestamp) { Time.new(2020, 01, 01).utc } - let(:entity) do + let_it_be(:user) { create(:user) } + let_it_be(:group) { create(:group) } + let_it_be(:cursor) { 'cursor' } + let_it_be(:timestamp) { Time.new(2020, 01, 01).utc } + + let_it_be(:entity) do create( :bulk_import_entity, source_full_path: 'source/full/path', @@ -17,7 +18,8 @@ ) end - let(:context) { BulkImports::Pipeline::Context.new(entity) } + let_it_be(:tracker) { create(:bulk_import_tracker, entity: entity) } + let_it_be(:context) { BulkImports::Pipeline::Context.new(tracker) } subject { described_class.new(context) } @@ -72,8 +74,6 @@ def extractor_data(title:, has_next_page:, cursor: nil) subject.after_run(data) - tracker = entity.trackers.find_by(relation: :labels) - expect(tracker.has_next_page).to eq(true) expect(tracker.next_page).to eq(cursor) end @@ -87,8 +87,6 @@ def extractor_data(title:, has_next_page:, cursor: nil) subject.after_run(data) - tracker = entity.trackers.find_by(relation: :labels) - expect(tracker.has_next_page).to eq(false) expect(tracker.next_page).to be_nil end diff --git a/spec/lib/bulk_imports/groups/pipelines/members_pipeline_spec.rb b/spec/lib/bulk_imports/groups/pipelines/members_pipeline_spec.rb index 74d3e09d263cc9..5c82a028751cfb 100644 --- a/spec/lib/bulk_imports/groups/pipelines/members_pipeline_spec.rb +++ b/spec/lib/bulk_imports/groups/pipelines/members_pipeline_spec.rb @@ -11,7 +11,8 @@ let_it_be(:cursor) { 'cursor' } let_it_be(:bulk_import) { create(:bulk_import, user: user) } let_it_be(:entity) { create(:bulk_import_entity, bulk_import: bulk_import, group: group) } - let_it_be(:context) { BulkImports::Pipeline::Context.new(entity) } + let_it_be(:tracker) { create(:bulk_import_tracker, entity: entity) } + let_it_be(:context) { BulkImports::Pipeline::Context.new(tracker) } subject { described_class.new(context) } diff --git a/spec/lib/bulk_imports/groups/pipelines/milestones_pipeline_spec.rb b/spec/lib/bulk_imports/groups/pipelines/milestones_pipeline_spec.rb index f0c34c65257df6..15a64a70ff3ea4 100644 --- a/spec/lib/bulk_imports/groups/pipelines/milestones_pipeline_spec.rb +++ b/spec/lib/bulk_imports/groups/pipelines/milestones_pipeline_spec.rb @@ -9,7 +9,7 @@ let_it_be(:timestamp) { Time.new(2020, 01, 01).utc } let_it_be(:bulk_import) { create(:bulk_import, user: user) } - let(:entity) do + let_it_be(:entity) do create( :bulk_import_entity, bulk_import: bulk_import, @@ -20,7 +20,8 @@ ) end - let(:context) { BulkImports::Pipeline::Context.new(entity) } + let_it_be(:tracker) { create(:bulk_import_tracker, entity: entity) } + let_it_be(:context) { BulkImports::Pipeline::Context.new(tracker) } subject { described_class.new(context) } @@ -84,8 +85,6 @@ def extracted_data(title:, has_next_page:, cursor: nil) subject.after_run(data) - tracker = entity.trackers.find_by(relation: :milestones) - expect(tracker.has_next_page).to eq(true) expect(tracker.next_page).to eq(cursor) end @@ -99,8 +98,6 @@ def extracted_data(title:, has_next_page:, cursor: nil) subject.after_run(data) - tracker = entity.trackers.find_by(relation: :milestones) - expect(tracker.has_next_page).to eq(false) expect(tracker.next_page).to be_nil end diff --git a/spec/lib/bulk_imports/groups/pipelines/subgroup_entities_pipeline_spec.rb b/spec/lib/bulk_imports/groups/pipelines/subgroup_entities_pipeline_spec.rb index 2a99646bb4ae2b..fd7265aea34fd3 100644 --- a/spec/lib/bulk_imports/groups/pipelines/subgroup_entities_pipeline_spec.rb +++ b/spec/lib/bulk_imports/groups/pipelines/subgroup_entities_pipeline_spec.rb @@ -6,19 +6,13 @@ let_it_be(:user) { create(:user) } let_it_be(:group) { create(:group, path: 'group') } let_it_be(:parent) { create(:group, name: 'imported-group', path: 'imported-group') } - let(:context) { BulkImports::Pipeline::Context.new(parent_entity) } + let_it_be(:parent_entity) { create(:bulk_import_entity, destination_namespace: parent.full_path, group: parent) } + let_it_be(:tracker) { create(:bulk_import_tracker, entity: parent_entity) } + let_it_be(:context) { BulkImports::Pipeline::Context.new(tracker) } subject { described_class.new(context) } describe '#run' do - let!(:parent_entity) do - create( - :bulk_import_entity, - destination_namespace: parent.full_path, - group: parent - ) - end - let(:subgroup_data) do [ { diff --git a/spec/lib/bulk_imports/groups/transformers/group_attributes_transformer_spec.rb b/spec/lib/bulk_imports/groups/transformers/group_attributes_transformer_spec.rb index b3fe8a2ba25a18..75d8c15088ad82 100644 --- a/spec/lib/bulk_imports/groups/transformers/group_attributes_transformer_spec.rb +++ b/spec/lib/bulk_imports/groups/transformers/group_attributes_transformer_spec.rb @@ -4,11 +4,12 @@ RSpec.describe BulkImports::Groups::Transformers::GroupAttributesTransformer do describe '#transform' do - let(:user) { create(:user) } - let(:parent) { create(:group) } - let(:group) { create(:group, name: 'My Source Group', parent: parent) } - let(:bulk_import) { create(:bulk_import, user: user) } - let(:entity) do + let_it_be(:user) { create(:user) } + let_it_be(:parent) { create(:group) } + let_it_be(:group) { create(:group, name: 'My Source Group', parent: parent) } + let_it_be(:bulk_import) { create(:bulk_import, user: user) } + + let_it_be(:entity) do create( :bulk_import_entity, bulk_import: bulk_import, @@ -18,7 +19,8 @@ ) end - let(:context) { BulkImports::Pipeline::Context.new(entity) } + let_it_be(:tracker) { create(:bulk_import_tracker, entity: entity) } + let_it_be(:context) { BulkImports::Pipeline::Context.new(tracker) } let(:data) do { @@ -82,14 +84,7 @@ context 'when destination namespace is empty' do it 'does not set parent id' do - entity = create( - :bulk_import_entity, - bulk_import: bulk_import, - source_full_path: 'source/full/path', - destination_name: group.name, - destination_namespace: '' - ) - context = BulkImports::Pipeline::Context.new(entity) + entity.update!(destination_namespace: '') transformed_data = subject.transform(context, data) diff --git a/spec/lib/bulk_imports/groups/transformers/member_attributes_transformer_spec.rb b/spec/lib/bulk_imports/groups/transformers/member_attributes_transformer_spec.rb index f66c67fc6a2b80..f3905a4b6e4a9f 100644 --- a/spec/lib/bulk_imports/groups/transformers/member_attributes_transformer_spec.rb +++ b/spec/lib/bulk_imports/groups/transformers/member_attributes_transformer_spec.rb @@ -8,7 +8,8 @@ let_it_be(:group) { create(:group) } let_it_be(:bulk_import) { create(:bulk_import, user: user) } let_it_be(:entity) { create(:bulk_import_entity, bulk_import: bulk_import, group: group) } - let_it_be(:context) { BulkImports::Pipeline::Context.new(entity) } + let_it_be(:tracker) { create(:bulk_import_tracker, entity: entity) } + let_it_be(:context) { BulkImports::Pipeline::Context.new(tracker) } it 'returns nil when receives no data' do expect(subject.transform(context, nil)).to eq(nil) diff --git a/spec/lib/bulk_imports/importers/group_importer_spec.rb b/spec/lib/bulk_imports/importers/group_importer_spec.rb index 5d501b49e41aad..9579ee53251092 100644 --- a/spec/lib/bulk_imports/importers/group_importer_spec.rb +++ b/spec/lib/bulk_imports/importers/group_importer_spec.rb @@ -3,18 +3,18 @@ require 'spec_helper' RSpec.describe BulkImports::Importers::GroupImporter do - let(:user) { create(:user) } - let(:group) { create(:group) } - let(:bulk_import) { create(:bulk_import) } - let(:bulk_import_entity) { create(:bulk_import_entity, :started, bulk_import: bulk_import, group: group) } - let(:bulk_import_configuration) { create(:bulk_import_configuration, bulk_import: bulk_import) } - let(:context) { BulkImports::Pipeline::Context.new(bulk_import_entity) } + let_it_be(:user) { create(:user) } + let_it_be(:group) { create(:group) } + let_it_be(:bulk_import) { create(:bulk_import) } + let_it_be(:entity) { create(:bulk_import_entity, :started, group: group) } + let_it_be(:tracker) { create(:bulk_import_tracker, entity: entity, pipeline_name: described_class.name) } + let_it_be(:context) { BulkImports::Pipeline::Context.new(tracker) } before do allow(BulkImports::Pipeline::Context).to receive(:new).and_return(context) end - subject { described_class.new(bulk_import_entity) } + subject { described_class.new(entity) } describe '#execute' do it 'starts the entity and run its pipelines' do @@ -33,18 +33,18 @@ subject.execute - expect(bulk_import_entity.reload).to be_finished + expect(entity).to be_finished end context 'when failed' do - let(:bulk_import_entity) { create(:bulk_import_entity, :failed, bulk_import: bulk_import, group: group) } + let(:entity) { create(:bulk_import_entity, :failed, bulk_import: bulk_import, group: group) } it 'does not transition entity to finished state' do - allow(bulk_import_entity).to receive(:start!) + allow(entity).to receive(:start!) subject.execute - expect(bulk_import_entity.reload).to be_failed + expect(entity.reload).to be_failed end end end diff --git a/spec/lib/bulk_imports/pipeline/context_spec.rb b/spec/lib/bulk_imports/pipeline/context_spec.rb index c8c3fe3a861db3..5b7711ad5d7133 100644 --- a/spec/lib/bulk_imports/pipeline/context_spec.rb +++ b/spec/lib/bulk_imports/pipeline/context_spec.rb @@ -3,29 +3,52 @@ require 'spec_helper' RSpec.describe BulkImports::Pipeline::Context do - let(:group) { instance_double(Group) } - let(:user) { instance_double(User) } - let(:bulk_import) { instance_double(BulkImport, user: user, configuration: :config) } - - let(:entity) do - instance_double( - BulkImports::Entity, - bulk_import: bulk_import, - group: group + let_it_be(:user) { create(:user) } + let_it_be(:group) { create(:group) } + let_it_be(:bulk_import) { create(:bulk_import, user: user) } + + let_it_be(:entity) do + create( + :bulk_import_entity, + source_full_path: 'source/full/path', + destination_name: 'My Destination Group', + destination_namespace: group.full_path, + group: group, + bulk_import: bulk_import + ) + end + + let_it_be(:tracker) do + create( + :bulk_import_tracker, + entity: entity, + pipeline_name: described_class.name ) end - subject { described_class.new(entity) } + subject { described_class.new(tracker, extra: :data) } + + describe '#entity' do + it { expect(subject.entity).to eq(entity) } + end describe '#group' do it { expect(subject.group).to eq(group) } end + describe '#bulk_import' do + it { expect(subject.bulk_import).to eq(bulk_import) } + end + describe '#current_user' do it { expect(subject.current_user).to eq(user) } end - describe '#current_user' do + describe '#configuration' do it { expect(subject.configuration).to eq(bulk_import.configuration) } end + + describe '#extra' do + it { expect(subject.extra).to eq(extra: :data) } + end end diff --git a/spec/lib/bulk_imports/pipeline/runner_spec.rb b/spec/lib/bulk_imports/pipeline/runner_spec.rb index 59f01c9caaa5dc..29fd1ee3ffcf9c 100644 --- a/spec/lib/bulk_imports/pipeline/runner_spec.rb +++ b/spec/lib/bulk_imports/pipeline/runner_spec.rb @@ -45,8 +45,9 @@ def after_run(_); end stub_const('BulkImports::MyPipeline', pipeline) end - let_it_be_with_refind(:entity) { create(:bulk_import_entity) } - let(:context) { BulkImports::Pipeline::Context.new(entity, extra: :data) } + let_it_be_with_reload(:entity) { create(:bulk_import_entity) } + let_it_be(:tracker) { create(:bulk_import_tracker, entity: entity) } + let_it_be(:context) { BulkImports::Pipeline::Context.new(tracker, extra: :data) } subject { BulkImports::MyPipeline.new(context) } @@ -170,12 +171,7 @@ def after_run(_); end BulkImports::MyPipeline.abort_on_failure! end - it 'marks entity as failed' do - expect { subject.run } - .to change(entity, :status_name).to(:failed) - end - - it 'logs warn message' do + it 'logs a warn message and marks entity as failed' do expect_next_instance_of(Gitlab::Import::Logger) do |logger| expect(logger).to receive(:warn) .with( @@ -188,6 +184,9 @@ def after_run(_); end end subject.run + + expect(entity.status_name).to eq(:failed) + expect(tracker.status_name).to eq(:failed) end end @@ -206,11 +205,11 @@ def after_run(_); end entity.fail_op! expect_next_instance_of(Gitlab::Import::Logger) do |logger| - expect(logger).to receive(:info) + expect(logger).to receive(:warn) .with( log_params( context, - message: 'Skipping due to failed pipeline status', + message: 'Skipping pipeline due to failed entity', pipeline_class: 'BulkImports::MyPipeline' ) ) diff --git a/spec/lib/bulk_imports/pipeline_spec.rb b/spec/lib/bulk_imports/pipeline_spec.rb index c882e3d26eaed4..dda2e41f06cb0e 100644 --- a/spec/lib/bulk_imports/pipeline_spec.rb +++ b/spec/lib/bulk_imports/pipeline_spec.rb @@ -3,6 +3,8 @@ require 'spec_helper' RSpec.describe BulkImports::Pipeline do + let(:context) { instance_double(BulkImports::Pipeline::Context, tracker: nil) } + before do stub_const('BulkImports::Extractor', Class.new) stub_const('BulkImports::Transformer', Class.new) @@ -44,7 +46,7 @@ def load; end end it 'returns itself when retrieving extractor & loader' do - pipeline = BulkImports::AnotherPipeline.new(nil) + pipeline = BulkImports::AnotherPipeline.new(context) expect(pipeline.send(:extractor)).to eq(pipeline) expect(pipeline.send(:loader)).to eq(pipeline) @@ -83,7 +85,7 @@ def load; end expect(BulkImports::Transformer).to receive(:new).with(foo: :bar) expect(BulkImports::Loader).to receive(:new).with(foo: :bar) - pipeline = BulkImports::MyPipeline.new(nil) + pipeline = BulkImports::MyPipeline.new(context) pipeline.send(:extractor) pipeline.send(:transformers) @@ -109,7 +111,7 @@ def load; end expect(BulkImports::Transformer).to receive(:new).with(no_args) expect(BulkImports::Loader).to receive(:new).with(no_args) - pipeline = BulkImports::NoOptionsPipeline.new(nil) + pipeline = BulkImports::NoOptionsPipeline.new(context) pipeline.send(:extractor) pipeline.send(:transformers) @@ -135,7 +137,7 @@ def transform; end transformer = double allow(BulkImports::Transformer).to receive(:new).and_return(transformer) - pipeline = BulkImports::TransformersPipeline.new(nil) + pipeline = BulkImports::TransformersPipeline.new(context) expect(pipeline.send(:transformers)).to eq([pipeline, transformer]) end diff --git a/spec/models/bulk_imports/entity_spec.rb b/spec/models/bulk_imports/entity_spec.rb index 17ab4d5954cc66..652ea431696f29 100644 --- a/spec/models/bulk_imports/entity_spec.rb +++ b/spec/models/bulk_imports/entity_spec.rb @@ -125,68 +125,4 @@ end end end - - describe "#update_tracker_for" do - let(:entity) { create(:bulk_import_entity) } - - it "inserts new tracker when it does not exist" do - expect do - entity.update_tracker_for(relation: :relation, has_next_page: false) - end.to change(BulkImports::Tracker, :count).by(1) - - tracker = entity.trackers.last - - expect(tracker.relation).to eq('relation') - expect(tracker.has_next_page).to eq(false) - expect(tracker.next_page).to eq(nil) - end - - it "updates the tracker if it already exist" do - create( - :bulk_import_tracker, - relation: :relation, - has_next_page: false, - entity: entity - ) - - expect do - entity.update_tracker_for(relation: :relation, has_next_page: true, next_page: 'nextPage') - end.not_to change(BulkImports::Tracker, :count) - - tracker = entity.trackers.last - - expect(tracker.relation).to eq('relation') - expect(tracker.has_next_page).to eq(true) - expect(tracker.next_page).to eq('nextPage') - end - end - - describe "#has_next_page?" do - it "queries for the given relation if it has more pages to be fetched" do - entity = create(:bulk_import_entity) - create( - :bulk_import_tracker, - relation: :relation, - has_next_page: false, - entity: entity - ) - - expect(entity.has_next_page?(:relation)).to eq(false) - end - end - - describe "#next_page_for" do - it "queries for the next page of the given relation" do - entity = create(:bulk_import_entity) - create( - :bulk_import_tracker, - relation: :relation, - has_next_page: false, - next_page: 'nextPage', - entity: entity - ) - - expect(entity.next_page_for(:relation)).to eq('nextPage') - end - end end -- GitLab