From b7c33981648e5e29d29fc937cef466ac81a7e7e4 Mon Sep 17 00:00:00 2001 From: Kassio Borges Date: Sat, 6 Mar 2021 02:30:18 +0000 Subject: [PATCH] BulkImports: Remove after_run duplication With the introduction of the new fields on the `BulkImports::Tracker` to track concurrent work, the duplication on `BulkImports::Pipeline#after_run` in most of the pipelines became more evident. This commit, DRYs up this duplication, moving the `after_run` logic to within the `BulkImports::Pipeline` concern. --- .../pipelines/epic_award_emoji_pipeline.rb | 9 +-- .../groups/pipelines/epic_events_pipeline.rb | 9 +-- .../groups/pipelines/epics_pipeline.rb | 11 --- .../groups/pipelines/iterations_pipeline.rb | 11 --- .../epic_award_emoji_pipeline_spec.rb | 73 ++++++++--------- .../pipelines/epic_events_pipeline_spec.rb | 76 +++++++++--------- .../groups/pipelines/epics_pipeline_spec.rb | 48 +++-------- .../pipelines/iterations_pipeline_spec.rb | 78 ++++++------------ .../groups/pipelines/labels_pipeline.rb | 11 --- .../groups/pipelines/members_pipeline.rb | 11 --- .../groups/pipelines/milestones_pipeline.rb | 11 --- lib/bulk_imports/pipeline/runner.rb | 27 ++++--- .../groups/pipelines/labels_pipeline_spec.rb | 71 +++++----------- .../groups/pipelines/members_pipeline_spec.rb | 11 ++- .../pipelines/milestones_pipeline_spec.rb | 80 +++++++------------ .../subgroup_entities_pipeline_spec.rb | 18 ++--- spec/lib/bulk_imports/pipeline/runner_spec.rb | 46 ++++++++--- 17 files changed, 229 insertions(+), 372 deletions(-) diff --git a/ee/lib/ee/bulk_imports/groups/pipelines/epic_award_emoji_pipeline.rb b/ee/lib/ee/bulk_imports/groups/pipelines/epic_award_emoji_pipeline.rb index 1300ae872ccbdb..81f507a1a5282a 100644 --- a/ee/lib/ee/bulk_imports/groups/pipelines/epic_award_emoji_pipeline.rb +++ b/ee/lib/ee/bulk_imports/groups/pipelines/epic_award_emoji_pipeline.rb @@ -24,12 +24,9 @@ def initialize(context) set_next_epic end - def after_run(extracted_data) - tracker.update( - has_next_page: extracted_data.has_next_page?, - next_page: extracted_data.next_page - ) + private + def after_run(extracted_data) set_next_epic unless extracted_data.has_next_page? if extracted_data.has_next_page? || context.extra[:epic_iid] @@ -37,8 +34,6 @@ def after_run(extracted_data) end end - private - def set_next_epic context.extra[:epic_iid] = @epic_iids.pop end diff --git a/ee/lib/ee/bulk_imports/groups/pipelines/epic_events_pipeline.rb b/ee/lib/ee/bulk_imports/groups/pipelines/epic_events_pipeline.rb index 5506cbbab8f135..ef8dd70e68622c 100644 --- a/ee/lib/ee/bulk_imports/groups/pipelines/epic_events_pipeline.rb +++ b/ee/lib/ee/bulk_imports/groups/pipelines/epic_events_pipeline.rb @@ -47,12 +47,9 @@ def load(context, data) end end - def after_run(extracted_data) - tracker.update( - has_next_page: extracted_data.has_next_page?, - next_page: extracted_data.next_page - ) + private + def after_run(extracted_data) set_next_epic unless extracted_data.has_next_page? if extracted_data.has_next_page? || context.extra[:epic_iid] @@ -60,8 +57,6 @@ def after_run(extracted_data) end end - private - def set_next_epic context.extra[:epic_iid] = @epic_iids.pop end diff --git a/ee/lib/ee/bulk_imports/groups/pipelines/epics_pipeline.rb b/ee/lib/ee/bulk_imports/groups/pipelines/epics_pipeline.rb index 0e12248fd40f3d..044e96077f1d29 100644 --- a/ee/lib/ee/bulk_imports/groups/pipelines/epics_pipeline.rb +++ b/ee/lib/ee/bulk_imports/groups/pipelines/epics_pipeline.rb @@ -23,17 +23,6 @@ def load(context, data) context.group.epics.create!(data) end - def after_run(extracted_data) - tracker.update( - has_next_page: extracted_data.has_next_page?, - next_page: extracted_data.next_page - ) - - if extracted_data.has_next_page? - run - end - end - private def authorized? diff --git a/ee/lib/ee/bulk_imports/groups/pipelines/iterations_pipeline.rb b/ee/lib/ee/bulk_imports/groups/pipelines/iterations_pipeline.rb index 39f4e8f90e68e4..de87c80a266558 100644 --- a/ee/lib/ee/bulk_imports/groups/pipelines/iterations_pipeline.rb +++ b/ee/lib/ee/bulk_imports/groups/pipelines/iterations_pipeline.rb @@ -20,17 +20,6 @@ def load(context, data) context.group.iterations.create!(data) end - def after_run(extracted_data) - tracker.update( - has_next_page: extracted_data.has_next_page?, - next_page: extracted_data.next_page - ) - - if extracted_data.has_next_page? - run - end - end - private def authorized? diff --git a/ee/spec/lib/ee/bulk_imports/groups/pipelines/epic_award_emoji_pipeline_spec.rb b/ee/spec/lib/ee/bulk_imports/groups/pipelines/epic_award_emoji_pipeline_spec.rb index 4351e22d80f1f4..8ba45c5620c356 100644 --- a/ee/spec/lib/ee/bulk_imports/groups/pipelines/epic_award_emoji_pipeline_spec.rb +++ b/ee/spec/lib/ee/bulk_imports/groups/pipelines/epic_award_emoji_pipeline_spec.rb @@ -3,7 +3,6 @@ require 'spec_helper' RSpec.describe EE::BulkImports::Groups::Pipelines::EpicAwardEmojiPipeline do - let_it_be(:cursor) { 'cursor' } let_it_be(:user) { create(:user) } let_it_be(:group) { create(:group) } let_it_be(:epic) { create(:epic, group: group) } @@ -40,12 +39,10 @@ describe '#run' do it 'imports epic award emoji' do - data = extractor_data(has_next_page: false) - allow_next_instance_of(BulkImports::Common::Extractors::GraphqlExtractor) do |extractor| allow(extractor) .to receive(:extract) - .and_return(data) + .and_return(extracted_data) end expect { subject.run }.to change(::AwardEmoji, :count).by(1) @@ -53,42 +50,46 @@ end end - describe '#after_run' do - context 'when extracted data has next page' do - it 'updates tracker information and runs pipeline again' do - data = extractor_data(has_next_page: true, cursor: cursor) - - expect(subject).to receive(:run) - - subject.after_run(data) + context 'when extracted data many pages' do + it 'runs pipeline for the second page' do + first_page = extracted_data(has_next_page: true) + last_page = extracted_data - expect(tracker.has_next_page).to eq(true) - expect(tracker.next_page).to eq(cursor) + allow_next_instance_of(BulkImports::Common::Extractors::GraphqlExtractor) do |extractor| + allow(extractor) + .to receive(:extract) + .and_return(first_page, last_page) end - end - context 'when extracted data has no next page' do - it 'updates tracker information and does not run pipeline' do - data = extractor_data(has_next_page: false) - - expect(subject).not_to receive(:run) + subject.run + end + end - subject.after_run(data) + context 'when there is many epics to import' do + let_it_be(:second_epic) { create(:epic, group: group) } - expect(tracker.has_next_page).to eq(false) - expect(tracker.next_page).to be_nil + it 'runs the pipeline for the next epic' do + allow_next_instance_of(BulkImports::Common::Extractors::GraphqlExtractor) do |extractor| + allow(extractor) + .to receive(:extract) + .twice # for each epic + .and_return(extracted_data) end - it 'updates context with next epic iid' do - epic2 = create(:epic, group: group) - data = extractor_data(has_next_page: false) - - expect(subject).to receive(:run) - - subject.after_run(data) - - expect(context.extra[:epic_iid]).to eq(epic2.iid) - end + expect(context.extra) + .to receive(:[]=) + .with(:epic_iid, epic.iid) + .and_call_original + expect(context.extra) + .to receive(:[]=) + .with(:epic_iid, second_epic.iid) + .and_call_original + expect(context.extra) + .to receive(:[]=) + .with(:epic_iid, nil) + .and_call_original + + subject.run end end @@ -119,12 +120,12 @@ end end - def extractor_data(has_next_page:, cursor: nil) + def extracted_data(has_next_page: false) data = [{ 'name' => 'thumbsup' }] page_info = { - 'end_cursor' => cursor, - 'has_next_page' => has_next_page + 'has_next_page' => has_next_page, + 'end_cursor' => has_next_page ? 'cursor' : nil } BulkImports::Pipeline::ExtractedData.new(data: data, page_info: page_info) diff --git a/ee/spec/lib/ee/bulk_imports/groups/pipelines/epic_events_pipeline_spec.rb b/ee/spec/lib/ee/bulk_imports/groups/pipelines/epic_events_pipeline_spec.rb index e4f609043943f5..61c28af112d700 100644 --- a/ee/spec/lib/ee/bulk_imports/groups/pipelines/epic_events_pipeline_spec.rb +++ b/ee/spec/lib/ee/bulk_imports/groups/pipelines/epic_events_pipeline_spec.rb @@ -3,7 +3,6 @@ require 'spec_helper' RSpec.describe EE::BulkImports::Groups::Pipelines::EpicEventsPipeline do - let_it_be(:cursor) { 'cursor' } let_it_be(:user) { create(:user) } let_it_be(:group) { create(:group) } let_it_be(:epic) { create(:epic, group: group) } @@ -40,12 +39,10 @@ describe '#run' do it 'imports epic events and resource state events' do - data = extractor_data(has_next_page: false) - allow_next_instance_of(BulkImports::Common::Extractors::GraphqlExtractor) do |extractor| allow(extractor) .to receive(:extract) - .and_return(data) + .and_return(extracted_data) end subject.run @@ -94,42 +91,46 @@ end end - describe '#after_run' do - context 'when extracted data has next page' do - it 'updates tracker information and runs pipeline again' do - data = extractor_data(has_next_page: true, cursor: cursor) - - expect(subject).to receive(:run) - - subject.after_run(data) + context 'when extracted data many pages' do + it 'runs pipeline for the second page' do + first_page = extracted_data(has_next_page: true) + last_page = extracted_data - expect(tracker.has_next_page).to eq(true) - expect(tracker.next_page).to eq(cursor) + allow_next_instance_of(BulkImports::Common::Extractors::GraphqlExtractor) do |extractor| + allow(extractor) + .to receive(:extract) + .and_return(first_page, last_page) end - end - - context 'when extracted data has no next page' do - it 'updates tracker information and does not run pipeline' do - data = extractor_data(has_next_page: false) - expect(subject).not_to receive(:run) + subject.run + end + end - subject.after_run(data) + context 'when there is many epics to import' do + let_it_be(:second_epic) { create(:epic, group: group) } - expect(tracker.has_next_page).to eq(false) - expect(tracker.next_page).to be_nil + it 'runs the pipeline for the next epic' do + allow_next_instance_of(BulkImports::Common::Extractors::GraphqlExtractor) do |extractor| + allow(extractor) + .to receive(:extract) + .twice # for each epic + .and_return(extracted_data) end - it 'updates context with next epic iid' do - epic2 = create(:epic, group: group) - data = extractor_data(has_next_page: false) - - expect(subject).to receive(:run) + expect(context.extra) + .to receive(:[]=) + .with(:epic_iid, epic.iid) + .and_call_original + expect(context.extra) + .to receive(:[]=) + .with(:epic_iid, second_epic.iid) + .and_call_original + expect(context.extra) + .to receive(:[]=) + .with(:epic_iid, nil) + .and_call_original - subject.after_run(data) - - expect(context.extra[:epic_iid]).to eq(epic2.iid) - end + subject.run end end @@ -156,7 +157,7 @@ end end - def extractor_data(has_next_page:, cursor: nil) + def extracted_data(has_next_page: false) data = [ { 'action' => 'CLOSED', @@ -169,10 +170,13 @@ def extractor_data(has_next_page:, cursor: nil) ] page_info = { - 'end_cursor' => cursor, - 'has_next_page' => has_next_page + 'has_next_page' => has_next_page, + 'end_cursor' => has_next_page ? 'cursor' : nil } - BulkImports::Pipeline::ExtractedData.new(data: data, page_info: page_info) + BulkImports::Pipeline::ExtractedData.new( + data: data, + page_info: page_info + ) end end diff --git a/ee/spec/lib/ee/bulk_imports/groups/pipelines/epics_pipeline_spec.rb b/ee/spec/lib/ee/bulk_imports/groups/pipelines/epics_pipeline_spec.rb index 8e9938d83b025a..03aeedc5f00052 100644 --- a/ee/spec/lib/ee/bulk_imports/groups/pipelines/epics_pipeline_spec.rb +++ b/ee/spec/lib/ee/bulk_imports/groups/pipelines/epics_pipeline_spec.rb @@ -3,7 +3,6 @@ require 'spec_helper' RSpec.describe EE::BulkImports::Groups::Pipelines::EpicsPipeline, :clean_gitlab_redis_cache do - let_it_be(:cursor) { 'cursor' } let_it_be(:user) { create(:user) } let_it_be(:group) { create(:group) } let_it_be(:bulk_import) { create(:bulk_import, user: user) } @@ -31,8 +30,8 @@ describe '#run' do it 'imports group epics into destination group' do - first_page = extractor_data(has_next_page: true, cursor: cursor) - last_page = extractor_data(has_next_page: false, page: 2) + first_page = extracted_data(has_next_page: true) + last_page = extracted_data allow_next_instance_of(BulkImports::Common::Extractors::GraphqlExtractor) do |extractor| allow(extractor) @@ -88,9 +87,8 @@ end it 'raises NotAllowedError' do - data = extractor_data(has_next_page: false) - - expect { subject.load(context, data) }.to raise_error(::BulkImports::Pipeline::NotAllowedError) + expect { subject.load(context, extracted_data) } + .to raise_error(::BulkImports::Pipeline::NotAllowedError) end end end @@ -109,34 +107,6 @@ end end - describe '#after_run' do - context 'when extracted data has next page' do - it 'updates tracker information and runs pipeline again' do - data = extractor_data(has_next_page: true, cursor: cursor) - - expect(subject).to receive(:run) - - subject.after_run(data) - - expect(tracker.has_next_page).to eq(true) - expect(tracker.next_page).to eq(cursor) - end - end - - context 'when extracted data has no next page' do - it 'updates tracker information and does not run pipeline' do - data = extractor_data(has_next_page: false) - - expect(subject).not_to receive(:run) - - subject.after_run(data) - - expect(tracker.has_next_page).to eq(false) - expect(tracker.next_page).to be_nil - end - end - end - describe 'pipeline parts' do it { expect(described_class).to include_module(BulkImports::Pipeline) } it { expect(described_class).to include_module(BulkImports::Pipeline::Runner) } @@ -160,11 +130,11 @@ end end - def extractor_data(has_next_page:, cursor: nil, page: 1) + def extracted_data(has_next_page: false) data = [ { - 'id' => "gid://gitlab/Epic/#{page}", - 'iid' => page, + 'id' => "gid://gitlab/Epic/99", + 'iid' => has_next_page ? 2 : 1, 'title' => 'epic1', 'state' => 'closed', 'confidential' => true, @@ -175,8 +145,8 @@ def extractor_data(has_next_page:, cursor: nil, page: 1) ] page_info = { - 'end_cursor' => cursor, - 'has_next_page' => has_next_page + 'has_next_page' => has_next_page, + 'end_cursor' => has_next_page ? 'cursor' : nil } BulkImports::Pipeline::ExtractedData.new(data: data, page_info: page_info) diff --git a/ee/spec/lib/ee/bulk_imports/groups/pipelines/iterations_pipeline_spec.rb b/ee/spec/lib/ee/bulk_imports/groups/pipelines/iterations_pipeline_spec.rb index b6887e224ffbf0..900ea19d588440 100644 --- a/ee/spec/lib/ee/bulk_imports/groups/pipelines/iterations_pipeline_spec.rb +++ b/ee/spec/lib/ee/bulk_imports/groups/pipelines/iterations_pipeline_spec.rb @@ -5,7 +5,6 @@ RSpec.describe EE::BulkImports::Groups::Pipelines::IterationsPipeline do let_it_be(:user) { create(:user) } let_it_be(:group) { create(:group) } - let_it_be(:cursor) { 'cursor' } let_it_be(:timestamp) { Time.new(2020, 01, 01).utc } let_it_be(:bulk_import) { create(:bulk_import, user: user) } @@ -30,31 +29,10 @@ group.add_owner(user) end - def iteration_data(title, start_date: Date.today) - { - 'title' => title, - 'description' => 'desc', - 'state' => 'upcoming', - 'start_date' => start_date, - 'due_date' => start_date + 1.day, - 'created_at' => timestamp.to_s, - 'updated_at' => timestamp.to_s - } - end - - def extracted_data(title:, has_next_page:, cursor: nil, start_date: Date.today) - page_info = { - 'end_cursor' => cursor, - 'has_next_page' => has_next_page - } - - BulkImports::Pipeline::ExtractedData.new(data: [iteration_data(title, start_date: start_date)], page_info: page_info) - end - describe '#run' do it 'imports group iterations' do - first_page = extracted_data(title: 'iteration1', has_next_page: true, cursor: cursor) - last_page = extracted_data(title: 'iteration2', has_next_page: false, start_date: Date.today + 2.days) + first_page = extracted_data(title: 'iteration1', has_next_page: true) + last_page = extracted_data(title: 'iteration2', start_date: Date.today + 2.days) allow_next_instance_of(BulkImports::Common::Extractors::GraphqlExtractor) do |extractor| allow(extractor) @@ -77,34 +55,6 @@ def extracted_data(title:, has_next_page:, cursor: nil, start_date: Date.today) end end - describe '#after_run' do - context 'when extracted data has next page' do - it 'updates tracker information and runs pipeline again' do - data = extracted_data(title: 'iteration', has_next_page: true, cursor: cursor) - - expect(subject).to receive(:run) - - subject.after_run(data) - - expect(tracker.has_next_page).to eq(true) - expect(tracker.next_page).to eq(cursor) - end - end - - context 'when extracted data has no next page' do - it 'updates tracker information and does not run pipeline' do - data = extracted_data(title: 'iteration', has_next_page: false) - - expect(subject).not_to receive(:run) - - subject.after_run(data) - - expect(tracker.has_next_page).to eq(false) - expect(tracker.next_page).to be_nil - end - end - end - describe '#load' do it 'creates the iteration' do data = iteration_data('iteration') @@ -146,4 +96,28 @@ def extracted_data(title:, has_next_page:, cursor: nil, start_date: Date.today) ) end end + + def iteration_data(title, start_date: Date.today) + { + 'title' => title, + 'description' => 'desc', + 'state' => 'upcoming', + 'start_date' => start_date, + 'due_date' => start_date + 1.day, + 'created_at' => timestamp.to_s, + 'updated_at' => timestamp.to_s + } + end + + def extracted_data(title:, start_date: Date.today, has_next_page: false) + page_info = { + 'has_next_page' => has_next_page, + 'end_cursor' => has_next_page ? 'cursor' : nil + } + + BulkImports::Pipeline::ExtractedData.new( + data: iteration_data(title, start_date: start_date), + page_info: page_info + ) + end end diff --git a/lib/bulk_imports/groups/pipelines/labels_pipeline.rb b/lib/bulk_imports/groups/pipelines/labels_pipeline.rb index 61d3e6c700e949..0dc4a968b84445 100644 --- a/lib/bulk_imports/groups/pipelines/labels_pipeline.rb +++ b/lib/bulk_imports/groups/pipelines/labels_pipeline.rb @@ -14,17 +14,6 @@ class LabelsPipeline def load(context, data) Labels::CreateService.new(data).execute(group: context.group) end - - def after_run(extracted_data) - tracker.update( - has_next_page: extracted_data.has_next_page?, - next_page: extracted_data.next_page - ) - - if extracted_data.has_next_page? - run - end - end end end end diff --git a/lib/bulk_imports/groups/pipelines/members_pipeline.rb b/lib/bulk_imports/groups/pipelines/members_pipeline.rb index d29bd74c5ae01e..5e4293d2c06f55 100644 --- a/lib/bulk_imports/groups/pipelines/members_pipeline.rb +++ b/lib/bulk_imports/groups/pipelines/members_pipeline.rb @@ -17,17 +17,6 @@ def load(context, data) context.group.members.create!(data) end - - def after_run(extracted_data) - tracker.update( - has_next_page: extracted_data.has_next_page?, - next_page: extracted_data.next_page - ) - - if extracted_data.has_next_page? - run - end - end end end end diff --git a/lib/bulk_imports/groups/pipelines/milestones_pipeline.rb b/lib/bulk_imports/groups/pipelines/milestones_pipeline.rb index eb51424c14a4ca..9b2be30735c106 100644 --- a/lib/bulk_imports/groups/pipelines/milestones_pipeline.rb +++ b/lib/bulk_imports/groups/pipelines/milestones_pipeline.rb @@ -19,17 +19,6 @@ def load(context, data) context.group.milestones.create!(data) end - def after_run(extracted_data) - tracker.update( - has_next_page: extracted_data.has_next_page?, - next_page: extracted_data.next_page - ) - - if extracted_data.has_next_page? - run - end - end - private def authorized? diff --git a/lib/bulk_imports/pipeline/runner.rb b/lib/bulk_imports/pipeline/runner.rb index 588d2c87209472..b756fba3bee546 100644 --- a/lib/bulk_imports/pipeline/runner.rb +++ b/lib/bulk_imports/pipeline/runner.rb @@ -14,19 +14,24 @@ def run extracted_data = extracted_data_from - extracted_data&.each do |entry| - transformers.each do |transformer| - entry = run_pipeline_step(:transformer, transformer.class.name) do - transformer.transform(context, entry) + if extracted_data + extracted_data.each do |entry| + transformers.each do |transformer| + entry = run_pipeline_step(:transformer, transformer.class.name) do + transformer.transform(context, entry) + end end - end - run_pipeline_step(:loader, loader.class.name) do - loader.load(context, entry) + run_pipeline_step(:loader, loader.class.name) do + loader.load(context, entry) + end end - end - if extracted_data && respond_to?(:after_run) + tracker.update!( + has_next_page: extracted_data.has_next_page?, + next_page: extracted_data.next_page + ) + run_pipeline_step(:after_run) do after_run(extracted_data) end @@ -65,6 +70,10 @@ def extracted_data_from end end + def after_run(extracted_data) + run if extracted_data.has_next_page? + end + def mark_as_failed warn(message: 'Pipeline failed') diff --git a/spec/lib/bulk_imports/groups/pipelines/labels_pipeline_spec.rb b/spec/lib/bulk_imports/groups/pipelines/labels_pipeline_spec.rb index 80ad5b69a61f3a..eeed5c6d07949d 100644 --- a/spec/lib/bulk_imports/groups/pipelines/labels_pipeline_spec.rb +++ b/spec/lib/bulk_imports/groups/pipelines/labels_pipeline_spec.rb @@ -5,7 +5,6 @@ RSpec.describe BulkImports::Groups::Pipelines::LabelsPipeline do let_it_be(:user) { create(:user) } let_it_be(:group) { create(:group) } - let_it_be(:cursor) { 'cursor' } let_it_be(:timestamp) { Time.new(2020, 01, 01).utc } let_it_be(:entity) do @@ -23,29 +22,10 @@ subject { described_class.new(context) } - def label_data(title) - { - 'title' => title, - 'description' => 'desc', - 'color' => '#428BCA', - 'created_at' => timestamp.to_s, - 'updated_at' => timestamp.to_s - } - end - - def extractor_data(title:, has_next_page:, cursor: nil) - page_info = { - 'end_cursor' => cursor, - 'has_next_page' => has_next_page - } - - BulkImports::Pipeline::ExtractedData.new(data: [label_data(title)], page_info: page_info) - end - describe '#run' do it 'imports a group labels' do - first_page = extractor_data(title: 'label1', has_next_page: true, cursor: cursor) - last_page = extractor_data(title: 'label2', has_next_page: false) + first_page = extracted_data(title: 'label1', has_next_page: true) + last_page = extracted_data(title: 'label2') allow_next_instance_of(BulkImports::Common::Extractors::GraphqlExtractor) do |extractor| allow(extractor) @@ -65,34 +45,6 @@ def extractor_data(title:, has_next_page:, cursor: nil) end end - describe '#after_run' do - context 'when extracted data has next page' do - it 'updates tracker information and runs pipeline again' do - data = extractor_data(title: 'label', has_next_page: true, cursor: cursor) - - expect(subject).to receive(:run) - - subject.after_run(data) - - expect(tracker.has_next_page).to eq(true) - expect(tracker.next_page).to eq(cursor) - end - end - - context 'when extracted data has no next page' do - it 'updates tracker information and does not run pipeline' do - data = extractor_data(title: 'label', has_next_page: false) - - expect(subject).not_to receive(:run) - - subject.after_run(data) - - expect(tracker.has_next_page).to eq(false) - expect(tracker.next_page).to be_nil - end - end - end - describe '#load' do it 'creates the label' do data = label_data('label') @@ -128,4 +80,23 @@ def extractor_data(title:, has_next_page:, cursor: nil) ) end end + + def label_data(title) + { + 'title' => title, + 'description' => 'desc', + 'color' => '#428BCA', + 'created_at' => timestamp.to_s, + 'updated_at' => timestamp.to_s + } + end + + def extracted_data(title:, has_next_page: false) + page_info = { + 'has_next_page' => has_next_page, + 'end_cursor' => has_next_page ? 'cursor' : nil + } + + BulkImports::Pipeline::ExtractedData.new(data: [label_data(title)], page_info: page_info) + end end diff --git a/spec/lib/bulk_imports/groups/pipelines/members_pipeline_spec.rb b/spec/lib/bulk_imports/groups/pipelines/members_pipeline_spec.rb index 5c82a028751cfb..0af45ae17d66f4 100644 --- a/spec/lib/bulk_imports/groups/pipelines/members_pipeline_spec.rb +++ b/spec/lib/bulk_imports/groups/pipelines/members_pipeline_spec.rb @@ -8,7 +8,6 @@ let_it_be(:user) { create(:user) } let_it_be(:group) { create(:group) } - let_it_be(:cursor) { 'cursor' } let_it_be(:bulk_import) { create(:bulk_import, user: user) } let_it_be(:entity) { create(:bulk_import_entity, bulk_import: bulk_import, group: group) } let_it_be(:tracker) { create(:bulk_import_tracker, entity: entity) } @@ -18,8 +17,8 @@ describe '#run' do it 'maps existing users to the imported group' do - first_page = member_data(email: member_user1.email, has_next_page: true, cursor: cursor) - last_page = member_data(email: member_user2.email, has_next_page: false) + first_page = extracted_data(email: member_user1.email, has_next_page: true) + last_page = extracted_data(email: member_user2.email) allow_next_instance_of(BulkImports::Common::Extractors::GraphqlExtractor) do |extractor| allow(extractor) @@ -89,7 +88,7 @@ end end - def member_data(email:, has_next_page:, cursor: nil) + def extracted_data(email:, has_next_page: false) data = { 'created_at' => '2020-01-01T00:00:00Z', 'updated_at' => '2020-01-01T00:00:00Z', @@ -103,8 +102,8 @@ def member_data(email:, has_next_page:, cursor: nil) } page_info = { - 'end_cursor' => cursor, - 'has_next_page' => has_next_page + 'has_next_page' => has_next_page, + 'end_cursor' => has_next_page ? 'cursor' : nil } BulkImports::Pipeline::ExtractedData.new(data: data, page_info: page_info) diff --git a/spec/lib/bulk_imports/groups/pipelines/milestones_pipeline_spec.rb b/spec/lib/bulk_imports/groups/pipelines/milestones_pipeline_spec.rb index 15a64a70ff3ea4..3ce810268345cc 100644 --- a/spec/lib/bulk_imports/groups/pipelines/milestones_pipeline_spec.rb +++ b/spec/lib/bulk_imports/groups/pipelines/milestones_pipeline_spec.rb @@ -5,7 +5,6 @@ RSpec.describe BulkImports::Groups::Pipelines::MilestonesPipeline do let_it_be(:user) { create(:user) } let_it_be(:group) { create(:group) } - let_it_be(:cursor) { 'cursor' } let_it_be(:timestamp) { Time.new(2020, 01, 01).utc } let_it_be(:bulk_import) { create(:bulk_import, user: user) } @@ -25,35 +24,14 @@ subject { described_class.new(context) } - def milestone_data(title) - { - 'title' => title, - 'description' => 'desc', - 'state' => 'closed', - 'start_date' => '2020-10-21', - 'due_date' => '2020-10-22', - 'created_at' => timestamp.to_s, - 'updated_at' => timestamp.to_s - } - end - - def extracted_data(title:, has_next_page:, cursor: nil) - page_info = { - 'end_cursor' => cursor, - 'has_next_page' => has_next_page - } - - BulkImports::Pipeline::ExtractedData.new(data: [milestone_data(title)], page_info: page_info) - end - before do group.add_owner(user) end describe '#run' do it 'imports group milestones' do - first_page = extracted_data(title: 'milestone1', has_next_page: true, cursor: cursor) - last_page = extracted_data(title: 'milestone2', has_next_page: false) + first_page = extracted_data(title: 'milestone1', has_next_page: true) + last_page = extracted_data(title: 'milestone2') allow_next_instance_of(BulkImports::Common::Extractors::GraphqlExtractor) do |extractor| allow(extractor) @@ -76,34 +54,6 @@ def extracted_data(title:, has_next_page:, cursor: nil) end end - describe '#after_run' do - context 'when extracted data has next page' do - it 'updates tracker information and runs pipeline again' do - data = extracted_data(title: 'milestone', has_next_page: true, cursor: cursor) - - expect(subject).to receive(:run) - - subject.after_run(data) - - expect(tracker.has_next_page).to eq(true) - expect(tracker.next_page).to eq(cursor) - end - end - - context 'when extracted data has no next page' do - it 'updates tracker information and does not run pipeline' do - data = extracted_data(title: 'milestone', has_next_page: false) - - expect(subject).not_to receive(:run) - - subject.after_run(data) - - expect(tracker.has_next_page).to eq(false) - expect(tracker.next_page).to be_nil - end - end - end - describe '#load' do it 'creates the milestone' do data = milestone_data('milestone') @@ -117,7 +67,7 @@ def extracted_data(title:, has_next_page:, cursor: nil) end it 'raises NotAllowedError' do - data = extracted_data(title: 'milestone', has_next_page: false) + data = extracted_data(title: 'milestone') expect { subject.load(context, data) }.to raise_error(::BulkImports::Pipeline::NotAllowedError) end @@ -145,4 +95,28 @@ def extracted_data(title:, has_next_page:, cursor: nil) ) end end + + def milestone_data(title) + { + 'title' => title, + 'description' => 'desc', + 'state' => 'closed', + 'start_date' => '2020-10-21', + 'due_date' => '2020-10-22', + 'created_at' => timestamp.to_s, + 'updated_at' => timestamp.to_s + } + end + + def extracted_data(title:, has_next_page: false) + page_info = { + 'has_next_page' => has_next_page, + 'end_cursor' => has_next_page ? 'cursor' : nil + } + + BulkImports::Pipeline::ExtractedData.new( + data: milestone_data(title), + page_info: page_info + ) + end end diff --git a/spec/lib/bulk_imports/groups/pipelines/subgroup_entities_pipeline_spec.rb b/spec/lib/bulk_imports/groups/pipelines/subgroup_entities_pipeline_spec.rb index fd7265aea34fd3..e4a41428dd2c56 100644 --- a/spec/lib/bulk_imports/groups/pipelines/subgroup_entities_pipeline_spec.rb +++ b/spec/lib/bulk_imports/groups/pipelines/subgroup_entities_pipeline_spec.rb @@ -12,19 +12,17 @@ subject { described_class.new(context) } - describe '#run' do - let(:subgroup_data) do - [ - { - "name" => "subgroup", - "full_path" => "parent/subgroup" - } - ] - end + let(:extracted_data) do + BulkImports::Pipeline::ExtractedData.new(data: { + 'name' => 'subgroup', + 'full_path' => 'parent/subgroup' + }) + end + describe '#run' do before do allow_next_instance_of(BulkImports::Groups::Extractors::SubgroupsExtractor) do |extractor| - allow(extractor).to receive(:extract).and_return(subgroup_data) + allow(extractor).to receive(:extract).and_return(extracted_data) end parent.add_owner(user) diff --git a/spec/lib/bulk_imports/pipeline/runner_spec.rb b/spec/lib/bulk_imports/pipeline/runner_spec.rb index 29fd1ee3ffcf9c..9cadc06d613c87 100644 --- a/spec/lib/bulk_imports/pipeline/runner_spec.rb +++ b/spec/lib/bulk_imports/pipeline/runner_spec.rb @@ -38,8 +38,6 @@ def load(context); end extractor BulkImports::Extractor transformer BulkImports::Transformer loader BulkImports::Loader - - def after_run(_); end end stub_const('BulkImports::MyPipeline', pipeline) @@ -54,8 +52,6 @@ def after_run(_); end describe 'pipeline runner' do context 'when entity is not marked as failed' do it 'runs pipeline extractor, transformer, loader' do - extracted_data = BulkImports::Pipeline::ExtractedData.new(data: { foo: :bar }) - expect_next_instance_of(BulkImports::Extractor) do |extractor| expect(extractor) .to receive(:extract) @@ -133,6 +129,22 @@ def after_run(_); end subject.run end + context 'when extracted data has multiple pages' do + it 'updates tracker information and runs pipeline again' do + first_page = extracted_data(has_next_page: true) + last_page = extracted_data + + expect_next_instance_of(BulkImports::Extractor) do |extractor| + expect(extractor) + .to receive(:extract) + .with(context) + .and_return(first_page, last_page) + end + + subject.run + end + end + context 'when exception is raised' do before do allow_next_instance_of(BulkImports::Extractor) do |extractor| @@ -218,14 +230,24 @@ def after_run(_); end subject.run end end - end - def log_params(context, extra = {}) - { - bulk_import_id: context.bulk_import.id, - bulk_import_entity_id: context.entity.id, - bulk_import_entity_type: context.entity.source_type, - context_extra: context.extra - }.merge(extra) + def log_params(context, extra = {}) + { + bulk_import_id: context.bulk_import.id, + bulk_import_entity_id: context.entity.id, + bulk_import_entity_type: context.entity.source_type, + context_extra: context.extra + }.merge(extra) + end + + def extracted_data(has_next_page: false) + BulkImports::Pipeline::ExtractedData.new( + data: { foo: :bar }, + page_info: { + 'has_next_page' => has_next_page, + 'end_cursor' => has_next_page ? 'cursor' : nil + } + ) + end end end -- GitLab