From a381517d36ee71a0a992a00d5771dd87bc56e76e Mon Sep 17 00:00:00 2001 From: George Koltsov Date: Fri, 22 Jan 2021 12:11:53 +0000 Subject: [PATCH] Process one record at a time in Bulk Import pipelines - Update BulkImports pipelines to process one record at a time, instead of operating on a whole collection - It is useful when during a transformation of a record we need to check if a previously processed record is persisted in db or not --- ...sov-graphql-extractor-yield-one-record.yml | 5 + .../groups/graphql/get_epics_query.rb | 34 ++++-- .../groups/loaders/epics_loader.rb | 18 +-- .../groups/pipelines/epics_pipeline.rb | 12 +- .../groups/graphql/get_epics_query_spec.rb | 31 ++++++ .../groups/loaders/epics_loader_spec.rb | 29 +---- .../groups/pipelines/epics_pipeline_spec.rb | 104 +++++++++++------- .../common/extractors/graphql_extractor.rb | 15 +-- .../common/transformers/hash_key_digger.rb | 23 ---- .../underscorify_keys_transformer.rb | 19 ---- .../groups/extractors/subgroups_extractor.rb | 4 +- .../groups/graphql/get_group_query.rb | 32 ++++-- .../groups/graphql/get_labels_query.rb | 12 ++ .../groups/loaders/labels_loader.rb | 11 +- .../groups/pipelines/group_pipeline.rb | 2 - .../groups/pipelines/labels_pipeline.rb | 11 +- lib/bulk_imports/pipeline/extracted_data.rb | 26 +++++ lib/bulk_imports/pipeline/runner.rb | 8 +- .../extractors/graphql_extractor_spec.rb | 66 ++++------- .../transformers/hash_key_digger_spec.rb | 28 ----- .../underscorify_keys_transformer_spec.rb | 27 ----- .../extractors/subgroups_extractor_spec.rb | 29 +++++ .../groups/graphql/get_group_query_spec.rb | 31 ++++++ .../groups/graphql/get_labels_query_spec.rb | 31 ++++++ .../groups/loaders/labels_loader_spec.rb | 35 ++++++ .../groups/pipelines/group_pipeline_spec.rb | 38 +++---- .../groups/pipelines/labels_pipeline_spec.rb | 67 +++++++---- .../pipeline/extracted_data_spec.rb | 53 +++++++++ spec/lib/bulk_imports/pipeline/runner_spec.rb | 16 ++- 29 files changed, 500 insertions(+), 317 deletions(-) create mode 100644 ee/changelogs/unreleased/georgekoltsov-graphql-extractor-yield-one-record.yml create mode 100644 ee/spec/lib/ee/bulk_imports/groups/graphql/get_epics_query_spec.rb delete mode 100644 lib/bulk_imports/common/transformers/hash_key_digger.rb delete mode 100644 lib/bulk_imports/common/transformers/underscorify_keys_transformer.rb create mode 100644 lib/bulk_imports/pipeline/extracted_data.rb delete mode 100644 spec/lib/bulk_imports/common/transformers/hash_key_digger_spec.rb delete mode 100644 spec/lib/bulk_imports/common/transformers/underscorify_keys_transformer_spec.rb create mode 100644 spec/lib/bulk_imports/groups/extractors/subgroups_extractor_spec.rb create mode 100644 spec/lib/bulk_imports/groups/graphql/get_group_query_spec.rb create mode 100644 spec/lib/bulk_imports/groups/graphql/get_labels_query_spec.rb create mode 100644 spec/lib/bulk_imports/groups/loaders/labels_loader_spec.rb create mode 100644 spec/lib/bulk_imports/pipeline/extracted_data_spec.rb diff --git a/ee/changelogs/unreleased/georgekoltsov-graphql-extractor-yield-one-record.yml b/ee/changelogs/unreleased/georgekoltsov-graphql-extractor-yield-one-record.yml new file mode 100644 index 00000000000000..2515b1744c9164 --- /dev/null +++ b/ee/changelogs/unreleased/georgekoltsov-graphql-extractor-yield-one-record.yml @@ -0,0 +1,5 @@ +--- +title: Process one record at a time in Bulk Import pipelines +merge_request: 52330 +author: +type: changed diff --git a/ee/lib/ee/bulk_imports/groups/graphql/get_epics_query.rb b/ee/lib/ee/bulk_imports/groups/graphql/get_epics_query.rb index 996c30b7e2824c..6e864e777ef1d1 100644 --- a/ee/lib/ee/bulk_imports/groups/graphql/get_epics_query.rb +++ b/ee/lib/ee/bulk_imports/groups/graphql/get_epics_query.rb @@ -16,22 +16,22 @@ def to_s first: 100, after: $cursor ) { - pageInfo { - endCursor - hasNextPage + page_info: pageInfo { + end_cursor: endCursor + has_next_page: hasNextPage } nodes { title description state - createdAt - closedAt - startDate - startDateFixed - startDateIsFixed - dueDateFixed - dueDateIsFixed - relativePosition + created_at: createdAt + closed_at: closedAt + start_date: startDate + start_date_fixed: startDateFixed + start_date_is_fixed: startDateIsFixed + due_date_fixed: dueDateFixed + due_date_is_fixed: dueDateIsFixed + relative_position: relativePosition confidential } } @@ -46,6 +46,18 @@ def variables(entity) cursor: entity.next_page_for(:epics) } end + + def base_path + %w[data group epics] + end + + def data_path + base_path << 'nodes' + end + + def page_info_path + base_path << 'page_info' + end end end end diff --git a/ee/lib/ee/bulk_imports/groups/loaders/epics_loader.rb b/ee/lib/ee/bulk_imports/groups/loaders/epics_loader.rb index a21686b10fa500..72bb32e1566bb7 100644 --- a/ee/lib/ee/bulk_imports/groups/loaders/epics_loader.rb +++ b/ee/lib/ee/bulk_imports/groups/loaders/epics_loader.rb @@ -10,19 +10,11 @@ def initialize(options = {}) end def load(context, data) - Array.wrap(data['nodes']).each do |args| - ::Epics::CreateService.new( - context.entity.group, - context.current_user, - args - ).execute - end - - context.entity.update_tracker_for( - relation: :epics, - has_next_page: data.dig('page_info', 'has_next_page'), - next_page: data.dig('page_info', 'end_cursor') - ) + ::Epics::CreateService.new( + context.entity.group, + context.current_user, + data + ).execute end end end diff --git a/ee/lib/ee/bulk_imports/groups/pipelines/epics_pipeline.rb b/ee/lib/ee/bulk_imports/groups/pipelines/epics_pipeline.rb index 4471a5d661ae91..91128ff67743c5 100644 --- a/ee/lib/ee/bulk_imports/groups/pipelines/epics_pipeline.rb +++ b/ee/lib/ee/bulk_imports/groups/pipelines/epics_pipeline.rb @@ -10,14 +10,18 @@ class EpicsPipeline extractor ::BulkImports::Common::Extractors::GraphqlExtractor, query: EE::BulkImports::Groups::Graphql::GetEpicsQuery - transformer ::BulkImports::Common::Transformers::HashKeyDigger, key_path: %w[data group epics] - transformer ::BulkImports::Common::Transformers::UnderscorifyKeysTransformer transformer ::BulkImports::Common::Transformers::ProhibitedAttributesTransformer loader EE::BulkImports::Groups::Loaders::EpicsLoader - def after_run(context) - if context.entity.has_next_page?(:epics) + def after_run(context, extracted_data) + context.entity.update_tracker_for( + relation: :epics, + has_next_page: extracted_data.has_next_page?, + next_page: extracted_data.next_page + ) + + if extracted_data.has_next_page? run(context) end end diff --git a/ee/spec/lib/ee/bulk_imports/groups/graphql/get_epics_query_spec.rb b/ee/spec/lib/ee/bulk_imports/groups/graphql/get_epics_query_spec.rb new file mode 100644 index 00000000000000..392f8e8ebec880 --- /dev/null +++ b/ee/spec/lib/ee/bulk_imports/groups/graphql/get_epics_query_spec.rb @@ -0,0 +1,31 @@ +# frozen_string_literal: true + +require 'spec_helper' + +RSpec.describe EE::BulkImports::Groups::Graphql::GetEpicsQuery do + describe '#variables' do + let(:entity) { double(source_full_path: 'test', next_page_for: 'next_page') } + + it 'returns query variables based on entity information' do + expected = { full_path: entity.source_full_path, cursor: entity.next_page_for } + + expect(described_class.variables(entity)).to eq(expected) + end + end + + describe '#data_path' do + it 'returns data path' do + expected = %w[data group epics nodes] + + expect(described_class.data_path).to eq(expected) + end + end + + describe '#page_info_path' do + it 'returns pagination information path' do + expected = %w[data group epics page_info] + + expect(described_class.page_info_path).to eq(expected) + end + end +end diff --git a/ee/spec/lib/ee/bulk_imports/groups/loaders/epics_loader_spec.rb b/ee/spec/lib/ee/bulk_imports/groups/loaders/epics_loader_spec.rb index fec5296d913d2a..4dd4fb6db89813 100644 --- a/ee/spec/lib/ee/bulk_imports/groups/loaders/epics_loader_spec.rb +++ b/ee/spec/lib/ee/bulk_imports/groups/loaders/epics_loader_spec.rb @@ -16,35 +16,18 @@ let(:data) do { - 'page_info' => { - 'end_cursor' => 'endCursorValue', - 'has_next_page' => true - }, - 'nodes' => [ - { - 'title' => 'epic1', - 'state' => 'opened', - 'confidential' => false - }, - { - 'title' => 'epic2', - 'state' => 'closed', - 'confidential' => true - } - ] + 'title' => 'epic1', + 'state' => 'opened', + 'confidential' => false } end subject { described_class.new } - it 'creates the epics and update the entity tracker' do - expect { subject.load(context, data) }.to change(::Epic, :count).by(2) + it 'creates the epic' do + expect { subject.load(context, data) }.to change(::Epic, :count).by(1) - tracker = entity.trackers.last - - expect(group.epics.count).to eq(2) - expect(tracker.has_next_page).to eq(true) - expect(tracker.next_page).to eq('endCursorValue') + expect(group.epics.count).to eq(1) end end end diff --git a/ee/spec/lib/ee/bulk_imports/groups/pipelines/epics_pipeline_spec.rb b/ee/spec/lib/ee/bulk_imports/groups/pipelines/epics_pipeline_spec.rb index 2e6a55e275b2f5..0ca6318695462d 100644 --- a/ee/spec/lib/ee/bulk_imports/groups/pipelines/epics_pipeline_spec.rb +++ b/ee/spec/lib/ee/bulk_imports/groups/pipelines/epics_pipeline_spec.rb @@ -3,30 +3,29 @@ require 'spec_helper' RSpec.describe EE::BulkImports::Groups::Pipelines::EpicsPipeline do - describe '#run' do - let(:user) { create(:user) } - let(:group) { create(:group) } - let(:entity) do - create( - :bulk_import_entity, - source_full_path: 'source/full/path', - destination_name: 'My Destination Group', - destination_namespace: group.full_path, - group: group - ) - end - - let(:context) do - BulkImports::Pipeline::Context.new( - current_user: user, - entity: entity - ) - end + let(:user) { create(:user) } + let(:group) { create(:group) } + let(:cursor) { 'cursor' } + let(:entity) do + create( + :bulk_import_entity, + source_full_path: 'source/full/path', + destination_name: 'My Destination Group', + destination_namespace: group.full_path, + group: group + ) + end - subject { described_class.new } + let(:context) do + BulkImports::Pipeline::Context.new( + current_user: user, + entity: entity + ) + end + describe '#run' do it 'imports group epics into destination group' do - first_page = extractor_data(has_next_page: true, cursor: 'nextPageCursor') + first_page = extractor_data(has_next_page: true, cursor: cursor) last_page = extractor_data(has_next_page: false) allow_next_instance_of(BulkImports::Common::Extractors::GraphqlExtractor) do |extractor| @@ -39,6 +38,38 @@ end end + describe '#after_run' do + context 'when extracted data has next page' do + it 'updates tracker information and runs pipeline again' do + data = extractor_data(has_next_page: true, cursor: cursor) + + expect(subject).to receive(:run).with(context) + + subject.after_run(context, data) + + tracker = entity.trackers.find_by(relation: :epics) + + expect(tracker.has_next_page).to eq(true) + expect(tracker.next_page).to eq(cursor) + end + end + + context 'when extracted data has no next page' do + it 'updates tracker information and does not run pipeline' do + data = extractor_data(has_next_page: false) + + expect(subject).not_to receive(:run).with(context) + + subject.after_run(context, data) + + tracker = entity.trackers.find_by(relation: :epics) + + expect(tracker.has_next_page).to eq(false) + expect(tracker.next_page).to be_nil + end + end + end + describe 'pipeline parts' do it { expect(described_class).to include_module(BulkImports::Pipeline) } it { expect(described_class).to include_module(BulkImports::Pipeline::Runner) } @@ -56,8 +87,6 @@ it 'has transformers' do expect(described_class.transformers) .to contain_exactly( - { klass: BulkImports::Common::Transformers::HashKeyDigger, options: { key_path: %w[data group epics] } }, - { klass: BulkImports::Common::Transformers::UnderscorifyKeysTransformer, options: nil }, { klass: BulkImports::Common::Transformers::ProhibitedAttributesTransformer, options: nil } ) end @@ -68,26 +97,19 @@ end def extractor_data(has_next_page:, cursor: nil) - [ + data = [ { - 'data' => { - 'group' => { - 'epics' => { - 'page_info' => { - 'end_cursor' => cursor, - 'has_next_page' => has_next_page - }, - 'nodes' => [ - { - 'title' => 'epic1', - 'state' => 'closed', - 'confidential' => true - } - ] - } - } - } + 'title' => 'epic1', + 'state' => 'closed', + 'confidential' => true } ] + + page_info = { + 'end_cursor' => cursor, + 'has_next_page' => has_next_page + } + + BulkImports::Pipeline::ExtractedData.new(data: data, page_info: page_info) end end diff --git a/lib/bulk_imports/common/extractors/graphql_extractor.rb b/lib/bulk_imports/common/extractors/graphql_extractor.rb index af274ee12992c6..9e53b525fe3a4a 100644 --- a/lib/bulk_imports/common/extractors/graphql_extractor.rb +++ b/lib/bulk_imports/common/extractors/graphql_extractor.rb @@ -4,17 +4,22 @@ module BulkImports module Common module Extractors class GraphqlExtractor - def initialize(query) - @query = query[:query] + def initialize(options = {}) + @query = options[:query] end def extract(context) client = graphql_client(context) - client.execute( + response = client.execute( client.parse(query.to_s), query.variables(context.entity) ).original_hash.deep_dup + + BulkImports::Pipeline::ExtractedData.new( + data: response.dig(*query.data_path), + page_info: response.dig(*query.page_info_path) + ) end private @@ -27,10 +32,6 @@ def graphql_client(context) token: context.configuration.access_token ) end - - def parsed_query - @parsed_query ||= graphql_client.parse(query.to_s) - end end end end diff --git a/lib/bulk_imports/common/transformers/hash_key_digger.rb b/lib/bulk_imports/common/transformers/hash_key_digger.rb deleted file mode 100644 index b4897b5b2bf9d2..00000000000000 --- a/lib/bulk_imports/common/transformers/hash_key_digger.rb +++ /dev/null @@ -1,23 +0,0 @@ -# frozen_string_literal: true - -module BulkImports - module Common - module Transformers - class HashKeyDigger - def initialize(options = {}) - @key_path = options[:key_path] - end - - def transform(_, data) - raise ArgumentError, "Given data must be a Hash" unless data.is_a?(Hash) - - data.dig(*Array.wrap(key_path)) - end - - private - - attr_reader :key_path - end - end - end -end diff --git a/lib/bulk_imports/common/transformers/underscorify_keys_transformer.rb b/lib/bulk_imports/common/transformers/underscorify_keys_transformer.rb deleted file mode 100644 index b32ab28fdbb54b..00000000000000 --- a/lib/bulk_imports/common/transformers/underscorify_keys_transformer.rb +++ /dev/null @@ -1,19 +0,0 @@ -# frozen_string_literal: true - -module BulkImports - module Common - module Transformers - class UnderscorifyKeysTransformer - def initialize(options = {}) - @options = options - end - - def transform(_, data) - data.deep_transform_keys do |key| - key.to_s.underscore - end - end - end - end - end -end diff --git a/lib/bulk_imports/groups/extractors/subgroups_extractor.rb b/lib/bulk_imports/groups/extractors/subgroups_extractor.rb index 5c5e686cec5f62..2924346cfc7e87 100644 --- a/lib/bulk_imports/groups/extractors/subgroups_extractor.rb +++ b/lib/bulk_imports/groups/extractors/subgroups_extractor.rb @@ -9,9 +9,11 @@ def initialize(*args); end def extract(context) encoded_parent_path = ERB::Util.url_encode(context.entity.source_full_path) - http_client(context.entity.bulk_import.configuration) + response = http_client(context.entity.bulk_import.configuration) .each_page(:get, "groups/#{encoded_parent_path}/subgroups") .flat_map(&:itself) + + BulkImports::Pipeline::ExtractedData.new(data: response) end private diff --git a/lib/bulk_imports/groups/graphql/get_group_query.rb b/lib/bulk_imports/groups/graphql/get_group_query.rb index 2bc0f60baa2c76..169c61247c30e7 100644 --- a/lib/bulk_imports/groups/graphql/get_group_query.rb +++ b/lib/bulk_imports/groups/graphql/get_group_query.rb @@ -12,18 +12,18 @@ def to_s group(fullPath: $full_path) { name path - fullPath + full_path: fullPath description visibility - emailsDisabled - lfsEnabled - mentionsDisabled - projectCreationLevel - requestAccessEnabled - requireTwoFactorAuthentication - shareWithGroupLock - subgroupCreationLevel - twoFactorGracePeriod + emails_disabled: emailsDisabled + lfs_enabled: lfsEnabled + mentions_disabled: mentionsDisabled + project_creation_level: projectCreationLevel + request_access_enabled: requestAccessEnabled + require_two_factor_authentication: requireTwoFactorAuthentication + share_with_group_lock: shareWithGroupLock + subgroup_creation_level: subgroupCreationLevel + two_factor_grace_period: twoFactorGracePeriod } } GRAPHQL @@ -32,6 +32,18 @@ def to_s def variables(entity) { full_path: entity.source_full_path } end + + def base_path + %w[data group] + end + + def data_path + base_path + end + + def page_info_path + base_path << 'page_info' + end end end end diff --git a/lib/bulk_imports/groups/graphql/get_labels_query.rb b/lib/bulk_imports/groups/graphql/get_labels_query.rb index d65c8ee27e1f50..cd57b46b9f49c7 100644 --- a/lib/bulk_imports/groups/graphql/get_labels_query.rb +++ b/lib/bulk_imports/groups/graphql/get_labels_query.rb @@ -32,6 +32,18 @@ def variables(entity) cursor: entity.next_page_for(:labels) } end + + def base_path + %w[data group labels] + end + + def data_path + base_path << 'nodes' + end + + def page_info_path + base_path << 'page_info' + end end end end diff --git a/lib/bulk_imports/groups/loaders/labels_loader.rb b/lib/bulk_imports/groups/loaders/labels_loader.rb index 17799ff25734ee..0f896ed25982d5 100644 --- a/lib/bulk_imports/groups/loaders/labels_loader.rb +++ b/lib/bulk_imports/groups/loaders/labels_loader.rb @@ -7,16 +7,7 @@ class LabelsLoader def initialize(*); end def load(context, data) - Array.wrap(data['nodes']).each do |entry| - Labels::CreateService.new(entry) - .execute(group: context.entity.group) - end - - context.entity.update_tracker_for( - relation: :labels, - has_next_page: data.dig('page_info', 'has_next_page'), - next_page: data.dig('page_info', 'end_cursor') - ) + Labels::CreateService.new(data).execute(group: context.entity.group) end end end diff --git a/lib/bulk_imports/groups/pipelines/group_pipeline.rb b/lib/bulk_imports/groups/pipelines/group_pipeline.rb index 5169e292180b07..8c6f089e8a4c0d 100644 --- a/lib/bulk_imports/groups/pipelines/group_pipeline.rb +++ b/lib/bulk_imports/groups/pipelines/group_pipeline.rb @@ -10,8 +10,6 @@ class GroupPipeline extractor Common::Extractors::GraphqlExtractor, query: Graphql::GetGroupQuery - transformer Common::Transformers::HashKeyDigger, key_path: %w[data group] - transformer Common::Transformers::UnderscorifyKeysTransformer transformer Common::Transformers::ProhibitedAttributesTransformer transformer Groups::Transformers::GroupAttributesTransformer diff --git a/lib/bulk_imports/groups/pipelines/labels_pipeline.rb b/lib/bulk_imports/groups/pipelines/labels_pipeline.rb index 0a36f5641e1777..6b48106e5b8432 100644 --- a/lib/bulk_imports/groups/pipelines/labels_pipeline.rb +++ b/lib/bulk_imports/groups/pipelines/labels_pipeline.rb @@ -9,13 +9,18 @@ class LabelsPipeline extractor BulkImports::Common::Extractors::GraphqlExtractor, query: BulkImports::Groups::Graphql::GetLabelsQuery - transformer BulkImports::Common::Transformers::HashKeyDigger, key_path: %w[data group labels] transformer Common::Transformers::ProhibitedAttributesTransformer loader BulkImports::Groups::Loaders::LabelsLoader - def after_run(context) - if context.entity.has_next_page?(:labels) + def after_run(context, extracted_data) + context.entity.update_tracker_for( + relation: :labels, + has_next_page: extracted_data.has_next_page?, + next_page: extracted_data.next_page + ) + + if extracted_data.has_next_page? run(context) end end diff --git a/lib/bulk_imports/pipeline/extracted_data.rb b/lib/bulk_imports/pipeline/extracted_data.rb new file mode 100644 index 00000000000000..685a91a4afeb75 --- /dev/null +++ b/lib/bulk_imports/pipeline/extracted_data.rb @@ -0,0 +1,26 @@ +# frozen_string_literal: true + +module BulkImports + module Pipeline + class ExtractedData + attr_reader :data + + def initialize(data: nil, page_info: {}) + @data = Array.wrap(data) + @page_info = page_info + end + + def has_next_page? + @page_info['has_next_page'] + end + + def next_page + @page_info['end_cursor'] + end + + def each(&block) + data.each(&block) + end + end + end +end diff --git a/lib/bulk_imports/pipeline/runner.rb b/lib/bulk_imports/pipeline/runner.rb index c95700de973033..946d8d4562277e 100644 --- a/lib/bulk_imports/pipeline/runner.rb +++ b/lib/bulk_imports/pipeline/runner.rb @@ -12,7 +12,9 @@ def run(context) info(context, message: 'Pipeline started', pipeline_class: pipeline) - Array.wrap(extracted_data_from(context)).each do |entry| + extracted_data = extracted_data_from(context) + + extracted_data&.each do |entry| transformers.each do |transformer| entry = run_pipeline_step(:transformer, transformer.class.name, context) do transformer.transform(context, entry) @@ -24,7 +26,7 @@ def run(context) end end - after_run(context) if respond_to?(:after_run) + after_run(context, extracted_data) if respond_to?(:after_run) rescue MarkedAsFailedError log_skip(context) end @@ -43,6 +45,8 @@ def run_pipeline_step(step, class_name, context) log_import_failure(e, step, context) mark_as_failed(context) if abort_on_failure? + + nil end def extracted_data_from(context) diff --git a/spec/lib/bulk_imports/common/extractors/graphql_extractor_spec.rb b/spec/lib/bulk_imports/common/extractors/graphql_extractor_spec.rb index 2abd3df20fde97..80607485b6eb09 100644 --- a/spec/lib/bulk_imports/common/extractors/graphql_extractor_spec.rb +++ b/spec/lib/bulk_imports/common/extractors/graphql_extractor_spec.rb @@ -5,8 +5,18 @@ RSpec.describe BulkImports::Common::Extractors::GraphqlExtractor do let(:graphql_client) { instance_double(BulkImports::Clients::Graphql) } let(:import_entity) { create(:bulk_import_entity) } - let(:response) { double(original_hash: { foo: :bar }) } - let(:query) { { query: double(to_s: 'test', variables: {}) } } + let(:response) { double(original_hash: { 'data' => { 'foo' => 'bar' }, 'page_info' => {} }) } + let(:options) do + { + query: double( + to_s: 'test', + variables: {}, + data_path: %w[data foo], + page_info_path: %w[data page_info] + ) + } + end + let(:context) do instance_double( BulkImports::Pipeline::Context, @@ -14,58 +24,20 @@ ) end - subject { described_class.new(query) } - - before do - allow(subject).to receive(:graphql_client).and_return(graphql_client) - allow(graphql_client).to receive(:parse) - end + subject { described_class.new(options) } describe '#extract' do before do - allow(subject).to receive(:query_variables).and_return({}) - allow(graphql_client).to receive(:execute).and_return(response) - end - - it 'returns original hash' do - expect(subject.extract(context)).to eq({ foo: :bar }) - end - end - - describe 'query variables' do - before do + allow(subject).to receive(:graphql_client).and_return(graphql_client) + allow(graphql_client).to receive(:parse) allow(graphql_client).to receive(:execute).and_return(response) end - context 'when variables are present' do - let(:variables) { { foo: :bar } } - let(:query) { { query: double(to_s: 'test', variables: variables) } } - - it 'builds graphql query variables for import entity' do - expect(graphql_client).to receive(:execute).with(anything, variables) - - subject.extract(context).first - end - end - - context 'when no variables are present' do - let(:query) { { query: double(to_s: 'test', variables: nil) } } - - it 'returns empty hash' do - expect(graphql_client).to receive(:execute).with(anything, nil) - - subject.extract(context).first - end - end - - context 'when variables are empty hash' do - let(:query) { { query: double(to_s: 'test', variables: {}) } } - - it 'makes graphql request with empty hash' do - expect(graphql_client).to receive(:execute).with(anything, {}) + it 'returns ExtractedData' do + extracted_data = subject.extract(context) - subject.extract(context).first - end + expect(extracted_data).to be_instance_of(BulkImports::Pipeline::ExtractedData) + expect(extracted_data.data).to contain_exactly('bar') end end end diff --git a/spec/lib/bulk_imports/common/transformers/hash_key_digger_spec.rb b/spec/lib/bulk_imports/common/transformers/hash_key_digger_spec.rb deleted file mode 100644 index 2b33701653e125..00000000000000 --- a/spec/lib/bulk_imports/common/transformers/hash_key_digger_spec.rb +++ /dev/null @@ -1,28 +0,0 @@ -# frozen_string_literal: true - -require 'spec_helper' - -RSpec.describe BulkImports::Common::Transformers::HashKeyDigger do - describe '#transform' do - it 'when the key_path is an array' do - data = { foo: { bar: :value } } - key_path = %i[foo bar] - transformed = described_class.new(key_path: key_path).transform(nil, data) - - expect(transformed).to eq(:value) - end - - it 'when the key_path is not an array' do - data = { foo: { bar: :value } } - key_path = :foo - transformed = described_class.new(key_path: key_path).transform(nil, data) - - expect(transformed).to eq({ bar: :value }) - end - - it "when the data is not a hash" do - expect { described_class.new(key_path: nil).transform(nil, nil) } - .to raise_error(ArgumentError, "Given data must be a Hash") - end - end -end diff --git a/spec/lib/bulk_imports/common/transformers/underscorify_keys_transformer_spec.rb b/spec/lib/bulk_imports/common/transformers/underscorify_keys_transformer_spec.rb deleted file mode 100644 index cdffa75069492e..00000000000000 --- a/spec/lib/bulk_imports/common/transformers/underscorify_keys_transformer_spec.rb +++ /dev/null @@ -1,27 +0,0 @@ -# frozen_string_literal: true - -require 'spec_helper' - -RSpec.describe BulkImports::Common::Transformers::UnderscorifyKeysTransformer do - describe '#transform' do - it 'deep underscorifies hash keys' do - data = { - 'fullPath' => 'Foo', - 'snakeKeys' => { - 'snakeCaseKey' => 'Bar', - 'moreKeys' => { - 'anotherSnakeCaseKey' => 'Test' - } - } - } - - transformed_data = described_class.new.transform(nil, data) - - expect(transformed_data).to have_key('full_path') - expect(transformed_data).to have_key('snake_keys') - expect(transformed_data['snake_keys']).to have_key('snake_case_key') - expect(transformed_data['snake_keys']).to have_key('more_keys') - expect(transformed_data.dig('snake_keys', 'more_keys')).to have_key('another_snake_case_key') - end - end -end diff --git a/spec/lib/bulk_imports/groups/extractors/subgroups_extractor_spec.rb b/spec/lib/bulk_imports/groups/extractors/subgroups_extractor_spec.rb new file mode 100644 index 00000000000000..9dd844a0d65f71 --- /dev/null +++ b/spec/lib/bulk_imports/groups/extractors/subgroups_extractor_spec.rb @@ -0,0 +1,29 @@ +# frozen_string_literal: true + +require 'spec_helper' + +RSpec.describe BulkImports::Groups::Extractors::SubgroupsExtractor do + describe '#extract' do + it 'returns ExtractedData response' do + user = create(:user) + bulk_import = create(:bulk_import) + entity = create(:bulk_import_entity, bulk_import: bulk_import) + configuration = create(:bulk_import_configuration, bulk_import: bulk_import) + response = [{ 'test' => 'group' }] + context = BulkImports::Pipeline::Context.new( + current_user: user, + entity: entity, + configuration: configuration + ) + + allow_next_instance_of(BulkImports::Clients::Http) do |client| + allow(client).to receive(:each_page).and_return(response) + end + + extracted_data = subject.extract(context) + + expect(extracted_data).to be_instance_of(BulkImports::Pipeline::ExtractedData) + expect(extracted_data.data).to eq(response) + end + end +end diff --git a/spec/lib/bulk_imports/groups/graphql/get_group_query_spec.rb b/spec/lib/bulk_imports/groups/graphql/get_group_query_spec.rb new file mode 100644 index 00000000000000..78f99c19346d46 --- /dev/null +++ b/spec/lib/bulk_imports/groups/graphql/get_group_query_spec.rb @@ -0,0 +1,31 @@ +# frozen_string_literal: true + +require 'spec_helper' + +RSpec.describe BulkImports::Groups::Graphql::GetGroupQuery do + describe '#variables' do + let(:entity) { double(source_full_path: 'test') } + + it 'returns query variables based on entity information' do + expected = { full_path: entity.source_full_path } + + expect(described_class.variables(entity)).to eq(expected) + end + end + + describe '#data_path' do + it 'returns data path' do + expected = %w[data group] + + expect(described_class.data_path).to eq(expected) + end + end + + describe '#page_info_path' do + it 'returns pagination information path' do + expected = %w[data group page_info] + + expect(described_class.page_info_path).to eq(expected) + end + end +end diff --git a/spec/lib/bulk_imports/groups/graphql/get_labels_query_spec.rb b/spec/lib/bulk_imports/groups/graphql/get_labels_query_spec.rb new file mode 100644 index 00000000000000..2d8f4fb399a7d9 --- /dev/null +++ b/spec/lib/bulk_imports/groups/graphql/get_labels_query_spec.rb @@ -0,0 +1,31 @@ +# frozen_string_literal: true + +require 'spec_helper' + +RSpec.describe BulkImports::Groups::Graphql::GetLabelsQuery do + describe '#variables' do + let(:entity) { double(source_full_path: 'test', next_page_for: 'next_page') } + + it 'returns query variables based on entity information' do + expected = { full_path: entity.source_full_path, cursor: entity.next_page_for } + + expect(described_class.variables(entity)).to eq(expected) + end + end + + describe '#data_path' do + it 'returns data path' do + expected = %w[data group labels nodes] + + expect(described_class.data_path).to eq(expected) + end + end + + describe '#page_info_path' do + it 'returns pagination information path' do + expected = %w[data group labels page_info] + + expect(described_class.page_info_path).to eq(expected) + end + end +end diff --git a/spec/lib/bulk_imports/groups/loaders/labels_loader_spec.rb b/spec/lib/bulk_imports/groups/loaders/labels_loader_spec.rb new file mode 100644 index 00000000000000..2dc360fc6b9f71 --- /dev/null +++ b/spec/lib/bulk_imports/groups/loaders/labels_loader_spec.rb @@ -0,0 +1,35 @@ +# frozen_string_literal: true + +require 'spec_helper' + +RSpec.describe BulkImports::Groups::Loaders::LabelsLoader do + describe '#load' do + let(:user) { create(:user) } + let(:group) { create(:group) } + let(:entity) { create(:bulk_import_entity, group: group) } + let(:context) do + BulkImports::Pipeline::Context.new( + entity: entity, + current_user: user + ) + end + + let(:data) do + { + 'title' => 'label', + 'description' => 'description', + 'color' => '#FFFFFF' + } + end + + it 'creates the label' do + expect { subject.load(context, data) }.to change(Label, :count).by(1) + + label = group.labels.first + + expect(label.title).to eq(data['title']) + expect(label.description).to eq(data['description']) + expect(label.color).to eq(data['color']) + end + end +end diff --git a/spec/lib/bulk_imports/groups/pipelines/group_pipeline_spec.rb b/spec/lib/bulk_imports/groups/pipelines/group_pipeline_spec.rb index 1a91f3d7a788c7..445d25abd993de 100644 --- a/spec/lib/bulk_imports/groups/pipelines/group_pipeline_spec.rb +++ b/spec/lib/bulk_imports/groups/pipelines/group_pipeline_spec.rb @@ -24,19 +24,15 @@ let(:group_data) do { - 'data' => { - 'group' => { - 'name' => 'source_name', - 'fullPath' => 'source/full/path', - 'visibility' => 'private', - 'projectCreationLevel' => 'developer', - 'subgroupCreationLevel' => 'maintainer', - 'description' => 'Group Description', - 'emailsDisabled' => true, - 'lfsEnabled' => false, - 'mentionsDisabled' => true - } - } + 'name' => 'source_name', + 'full_path' => 'source/full/path', + 'visibility' => 'private', + 'project_creation_level' => 'developer', + 'subgroup_creation_level' => 'maintainer', + 'description' => 'Group Description', + 'emails_disabled' => true, + 'lfs_enabled' => false, + 'mentions_disabled' => true } end @@ -60,13 +56,13 @@ expect(imported_group).not_to be_nil expect(imported_group.parent).to eq(parent) expect(imported_group.path).to eq(group_path) - expect(imported_group.description).to eq(group_data.dig('data', 'group', 'description')) - expect(imported_group.visibility).to eq(group_data.dig('data', 'group', 'visibility')) - expect(imported_group.project_creation_level).to eq(Gitlab::Access.project_creation_string_options[group_data.dig('data', 'group', 'projectCreationLevel')]) - expect(imported_group.subgroup_creation_level).to eq(Gitlab::Access.subgroup_creation_string_options[group_data.dig('data', 'group', 'subgroupCreationLevel')]) - expect(imported_group.lfs_enabled?).to eq(group_data.dig('data', 'group', 'lfsEnabled')) - expect(imported_group.emails_disabled?).to eq(group_data.dig('data', 'group', 'emailsDisabled')) - expect(imported_group.mentions_disabled?).to eq(group_data.dig('data', 'group', 'mentionsDisabled')) + expect(imported_group.description).to eq(group_data['description']) + expect(imported_group.visibility).to eq(group_data['visibility']) + expect(imported_group.project_creation_level).to eq(Gitlab::Access.project_creation_string_options[group_data['project_creation_level']]) + expect(imported_group.subgroup_creation_level).to eq(Gitlab::Access.subgroup_creation_string_options[group_data['subgroup_creation_level']]) + expect(imported_group.lfs_enabled?).to eq(group_data['lfs_enabled']) + expect(imported_group.emails_disabled?).to eq(group_data['emails_disabled']) + expect(imported_group.mentions_disabled?).to eq(group_data['mentions_disabled']) end end @@ -87,8 +83,6 @@ it 'has transformers' do expect(described_class.transformers) .to contain_exactly( - { klass: BulkImports::Common::Transformers::HashKeyDigger, options: { key_path: %w[data group] } }, - { klass: BulkImports::Common::Transformers::UnderscorifyKeysTransformer, options: nil }, { klass: BulkImports::Common::Transformers::ProhibitedAttributesTransformer, options: nil }, { klass: BulkImports::Groups::Transformers::GroupAttributesTransformer, options: nil } ) diff --git a/spec/lib/bulk_imports/groups/pipelines/labels_pipeline_spec.rb b/spec/lib/bulk_imports/groups/pipelines/labels_pipeline_spec.rb index 135995f1ff3c2b..1cdeff168aed01 100644 --- a/spec/lib/bulk_imports/groups/pipelines/labels_pipeline_spec.rb +++ b/spec/lib/bulk_imports/groups/pipelines/labels_pipeline_spec.rb @@ -5,6 +5,7 @@ RSpec.describe BulkImports::Groups::Pipelines::LabelsPipeline do let(:user) { create(:user) } let(:group) { create(:group) } + let(:cursor) { 'cursor' } let(:entity) do create( :bulk_import_entity, @@ -22,31 +23,26 @@ ) end - def extractor_data(title:, has_next_page:, cursor: "") - { - "data" => { - "group" => { - "labels" => { - "page_info" => { - "end_cursor" => cursor, - "has_next_page" => has_next_page - }, - "nodes" => [ - { - "title" => title, - "description" => "desc", - "color" => "#428BCA" - } - ] - } - } + def extractor_data(title:, has_next_page:, cursor: nil) + data = [ + { + 'title' => title, + 'description' => 'desc', + 'color' => '#428BCA' } + ] + + page_info = { + 'end_cursor' => cursor, + 'has_next_page' => has_next_page } + + BulkImports::Pipeline::ExtractedData.new(data: data, page_info: page_info) end describe '#run' do it 'imports a group labels' do - first_page = extractor_data(title: 'label1', has_next_page: true, cursor: 'nextPageCursor') + first_page = extractor_data(title: 'label1', has_next_page: true, cursor: cursor) last_page = extractor_data(title: 'label2', has_next_page: false) allow_next_instance_of(BulkImports::Common::Extractors::GraphqlExtractor) do |extractor| @@ -65,6 +61,38 @@ def extractor_data(title:, has_next_page:, cursor: "") end end + describe '#after_run' do + context 'when extracted data has next page' do + it 'updates tracker information and runs pipeline again' do + data = extractor_data(title: 'label', has_next_page: true, cursor: cursor) + + expect(subject).to receive(:run).with(context) + + subject.after_run(context, data) + + tracker = entity.trackers.find_by(relation: :labels) + + expect(tracker.has_next_page).to eq(true) + expect(tracker.next_page).to eq(cursor) + end + end + + context 'when extracted data has no next page' do + it 'updates tracker information and does not run pipeline' do + data = extractor_data(title: 'label', has_next_page: false) + + expect(subject).not_to receive(:run).with(context) + + subject.after_run(context, data) + + tracker = entity.trackers.find_by(relation: :labels) + + expect(tracker.has_next_page).to eq(false) + expect(tracker.next_page).to be_nil + end + end + end + describe 'pipeline parts' do it { expect(described_class).to include_module(BulkImports::Pipeline) } it { expect(described_class).to include_module(BulkImports::Pipeline::Runner) } @@ -82,7 +110,6 @@ def extractor_data(title:, has_next_page:, cursor: "") it 'has transformers' do expect(described_class.transformers) .to contain_exactly( - { klass: BulkImports::Common::Transformers::HashKeyDigger, options: { key_path: %w[data group labels] } }, { klass: BulkImports::Common::Transformers::ProhibitedAttributesTransformer, options: nil } ) end diff --git a/spec/lib/bulk_imports/pipeline/extracted_data_spec.rb b/spec/lib/bulk_imports/pipeline/extracted_data_spec.rb new file mode 100644 index 00000000000000..25c5178227ae47 --- /dev/null +++ b/spec/lib/bulk_imports/pipeline/extracted_data_spec.rb @@ -0,0 +1,53 @@ +# frozen_string_literal: true + +require 'spec_helper' + +RSpec.describe BulkImports::Pipeline::ExtractedData do + let(:data) { 'data' } + let(:has_next_page) { true } + let(:cursor) { 'cursor' } + let(:page_info) do + { + 'has_next_page' => has_next_page, + 'end_cursor' => cursor + } + end + + subject { described_class.new(data: data, page_info: page_info) } + + describe '#has_next_page?' do + context 'when next page is present' do + it 'returns true' do + expect(subject.has_next_page?).to eq(true) + end + end + + context 'when next page is not present' do + let(:has_next_page) { false } + + it 'returns false' do + expect(subject.has_next_page?).to eq(false) + end + end + end + + describe '#next_page' do + it 'returns next page cursor information' do + expect(subject.next_page).to eq(cursor) + end + end + + describe '#each' do + context 'when block is present' do + it 'yields each data item' do + expect { |b| subject.each(&b) }.to yield_control + end + end + + context 'when block is not present' do + it 'returns enumerator' do + expect(subject.each).to be_instance_of(Enumerator) + end + end + end +end diff --git a/spec/lib/bulk_imports/pipeline/runner_spec.rb b/spec/lib/bulk_imports/pipeline/runner_spec.rb index 04163d5876c732..de3489708d8dd7 100644 --- a/spec/lib/bulk_imports/pipeline/runner_spec.rb +++ b/spec/lib/bulk_imports/pipeline/runner_spec.rb @@ -53,18 +53,26 @@ def load(context); end end it 'runs pipeline extractor, transformer, loader' do - entries = [{ foo: :bar }] + extracted_data = BulkImports::Pipeline::ExtractedData.new(data: { foo: :bar }) expect_next_instance_of(BulkImports::Extractor) do |extractor| - expect(extractor).to receive(:extract).with(context).and_return(entries) + expect(extractor) + .to receive(:extract) + .with(context) + .and_return(extracted_data) end expect_next_instance_of(BulkImports::Transformer) do |transformer| - expect(transformer).to receive(:transform).with(context, entries.first).and_return(entries.first) + expect(transformer) + .to receive(:transform) + .with(context, extracted_data.data.first) + .and_return(extracted_data.data.first) end expect_next_instance_of(BulkImports::Loader) do |loader| - expect(loader).to receive(:load).with(context, entries.first) + expect(loader) + .to receive(:load) + .with(context, extracted_data.data.first) end expect_next_instance_of(Gitlab::Import::Logger) do |logger| -- GitLab