diff --git a/app/models/bulk_imports/tracker.rb b/app/models/bulk_imports/tracker.rb index cf84414f9cf2b80559a933118b1727d7833d6bb2..c935f59049ebbc0435aaea3487296aad8fdc6929 100644 --- a/app/models/bulk_imports/tracker.rb +++ b/app/models/bulk_imports/tracker.rb @@ -22,13 +22,15 @@ class BulkImports::Tracker < ApplicationRecord state_machine :status, initial: :created do state :created, value: 0 - state :started, value: 1 - state :finished, value: 2 + state :data_requested, value: 1 + state :started, value: 2 + state :finished, value: 3 state :failed, value: -1 state :skipped, value: -2 event :start do transition created: :started + transition data_requested: :started end event :finish do @@ -48,5 +50,9 @@ class BulkImports::Tracker < ApplicationRecord event :fail_op do transition any => :failed end + + event :data_request do + transition created: :data_requested + end end end diff --git a/app/workers/all_queues.yml b/app/workers/all_queues.yml index 921def1eef8a621bc8575b36c41a63fe0bed9b48..9c08f1a253cb94c8ed8dbfa182c051d85118e4f0 100644 --- a/app/workers/all_queues.yml +++ b/app/workers/all_queues.yml @@ -1515,6 +1515,22 @@ :weight: 1 :idempotent: :tags: [] +- :name: bulk_imports_group_relation_export + :feature_category: :importers + :has_external_dependencies: + :urgency: :low + :resource_boundary: :unknown + :weight: 1 + :idempotent: + :tags: [] +- :name: bulk_imports_group_relation_import + :feature_category: :importers + :has_external_dependencies: + :urgency: :low + :resource_boundary: :unknown + :weight: 1 + :idempotent: + :tags: [] - :name: chat_notification :feature_category: :chatops :has_external_dependencies: true diff --git a/app/workers/bulk_imports/entity_worker.rb b/app/workers/bulk_imports/entity_worker.rb index 5b41ccbdea1cac6dbcd7fc30d6cfef13d786df70..73d0cf488a2fc186b7c84b53290c3296f8d3843b 100644 --- a/app/workers/bulk_imports/entity_worker.rb +++ b/app/workers/bulk_imports/entity_worker.rb @@ -27,7 +27,12 @@ def perform(entity_id) Gitlab::ErrorTracking.track_exception(e, extra) - entity&.fail_op + # # + # + # Skip this error for now + # + # entity&.fail_op end end end diff --git a/app/workers/bulk_imports/group_relation_export_worker.rb b/app/workers/bulk_imports/group_relation_export_worker.rb new file mode 100644 index 0000000000000000000000000000000000000000..70874b8e250bb351f2e31b6030b54be0230acfb8 --- /dev/null +++ b/app/workers/bulk_imports/group_relation_export_worker.rb @@ -0,0 +1,84 @@ +# frozen_string_literal: true + + +# Export a single top level group relation to .ndjson file +# and send it to the provided URL +# +# @example +# BulkImports::GroupRelationExportWorker.new.perform(group.id, user.id, 'epics', 'https://gitlab.example/') +# +module BulkImports + class GroupRelationExportWorker # rubocop:disable Scalability/IdempotentWorker + include ApplicationWorker + include ExceptionBacktrace + + feature_category :importers + loggable_arguments 2 + sidekiq_options retry: false, dead: false + + def perform(group_id, user_id, relation, url) + # validate user permissions here + # validate url here + + @group = ::Group.find(group_id) + @shared = ::Gitlab::ImportExport::Shared.new(@group) + @url = url + @export_path = @shared.export_path + @filename = "#{relation}.ndjson" + @relation = group_tree[:include].find { |include| include[relation.to_sym] } + + if @relation + serialize_relation + send_file + end + + ensure + # FileUtils.rm_rf(@shared.archive_path) if @shared&.archive_path + end + + def serialize_relation + ::Gitlab::ImportExport::JSON::StreamingSerializer.new( + @group, + group_tree, + json_writer, + exportable_path: '' + ).serialize_relation(@relation) + end + + # Copied from `Gitlab::ImportExport::AfterExportStrategies::WebUploadStrategy` + def send_file + file = File.open(File.join(@export_path, @filename)) + options = { + body: { + file: file, + relation: 'epics' + }, + headers: { + 'Content-Type' => 'multipart/form-data', + 'Content-Length' => file.size.to_s + } + } + + Gitlab::HTTP.post(@url, options) # rubocop:disable GitlabSecurity/PublicSend + ensure + file.close + end + + def group_tree + @group_tree ||= ::Gitlab::ImportExport::Reader.new( + shared: @shared, + config: group_config + ).group_tree + end + + def group_config + ::Gitlab::ImportExport::Config.new( + config: ::Gitlab::ImportExport.group_config_file + ).to_h + end + + def json_writer + @json_writer ||= Gitlab::ImportExport::JSON::NdjsonWriter.new(@export_path) + end + end +end diff --git a/app/workers/bulk_imports/group_relation_import_worker.rb b/app/workers/bulk_imports/group_relation_import_worker.rb new file mode 100644 index 0000000000000000000000000000000000000000..a2cec2a337e69886e2e9824092f2043edb7a28f6 --- /dev/null +++ b/app/workers/bulk_imports/group_relation_import_worker.rb @@ -0,0 +1,28 @@ +# frozen_string_literal: true + + +# Export a single top level group relation to .ndjson file +# and send it to the provided URL +# +# @example +# BulkImports::GroupRelationExportWorker.new.perform(group.id, user.id, 'epics', 'https://gitlab.example/') +# +module BulkImports + class GroupRelationImportWorker # rubocop:disable Scalability/IdempotentWorker + include ApplicationWorker + include ExceptionBacktrace + + feature_category :importers + loggable_arguments 2 + sidekiq_options retry: false, dead: false + + def perform(group_id, relation, tmp_filepath) + bulk_import_entity = BulkImports::Entity.find_by(namespace_id: group_id) + tracker = bulk_import_entity.trackers.find_by(relation: 'EE::BulkImports::Groups::Pipelines::AnotherEpicsPipeline') + context = BulkImports::Pipeline::Context.new(tracker) + context.extra[:tmp_filepath] = tmp_filepath + + EE::BulkImports::Groups::Pipelines::AnotherEpicsPipeline.new(context).run + end + end +end diff --git a/config/sidekiq.yml.example b/config/sidekiq.yml.example index 714bc06cb2417077da463a909d365d11e5abaa74..36e96259d197b264f6b5dba999860bd112f58ed4 100644 --- a/config/sidekiq.yml.example +++ b/config/sidekiq.yml.example @@ -1,2 +1,2 @@ --- -:concurrency: 5 \ No newline at end of file +:concurrency: 10 diff --git a/config/sidekiq_queues.yml b/config/sidekiq_queues.yml index f3fcb00fee42e3867c462333c23b0b4547e1c603..17310c631d21c7672f469859f42b3fb61fcc2c8a 100644 --- a/config/sidekiq_queues.yml +++ b/config/sidekiq_queues.yml @@ -56,6 +56,10 @@ - 1 - - bulk_imports_entity - 1 +- - bulk_imports_group_relation_export + - 1 +- - bulk_imports_group_relation_import + - 1 - - chaos - 2 - - chat_notification diff --git a/ee/lib/ee/bulk_imports/groups/pipelines/another_epics_pipeline.rb b/ee/lib/ee/bulk_imports/groups/pipelines/another_epics_pipeline.rb new file mode 100644 index 0000000000000000000000000000000000000000..c2a037a73baa4b45df9c7b51ce94330e83be1bef --- /dev/null +++ b/ee/lib/ee/bulk_imports/groups/pipelines/another_epics_pipeline.rb @@ -0,0 +1,122 @@ +# frozen_string_literal: true + +module EE + module BulkImports + module Groups + module Pipelines + class AnotherEpicsPipeline + include ::BulkImports::Pipeline + + relation :epics + ndjson_pipeline! + + def async_extract + client = ::BulkImports::Clients::Http.new(uri: context.configuration.url, token: context.configuration.access_token) + encoded_group_path = ERB::Util.url_encode(context.entity.source_full_path) + resource_path = "groups/#{encoded_group_path}/export_relation" + callback_url = "http://gdk.test:3000/api/v4/groups/#{context.group.id}/import_relation" + options = { relation: relation, url: callback_url } + + client.post(resource_path, options) + end + + def extract(context) + File.foreach(context.extra[:tmp_filepath]) + end + + def transform(context, data) + hash = ActiveSupport::JSON.decode(data) + + object = build_relation('epics', epic_relation_definition, hash) + object.assign_attributes(group_id: context.group.id) + object + end + + def load(context, epic) + epic.save! + end + + def after_run + nil + end + + def build_relation(relation_key, relation_definition, data_hash) + return data_hash if relation_key == 'author' + + data_hash.delete('author') if relation_key == 'notes' + + relation_key = ::Gitlab::ImportExport::Group::RelationFactory::OVERRIDES[relation_key.to_sym]&.to_s || relation_key + + relation_definition.each do |sub_relation_key, sub_relation_definition| + transform_sub_relations!(data_hash, sub_relation_key, sub_relation_definition) + end + + object_class = relation_class(relation_key) + + object = object_class.new + object.assign_attributes(cleaned_data_hash(data_hash, object_class)) + object.importing = true if object.respond_to?(:importing) + object + end + + def build_relations(relation_key, relation_definition, data_hashes) + data_hashes + .map { |data_hash| build_relation(relation_key, relation_definition, data_hash) } + .tap { |entries| entries.compact! } + end + + def transform_sub_relations!(data_hash, sub_relation_key, sub_relation_definition) + sub_data_hash = data_hash[sub_relation_key] + return unless sub_data_hash + + current_item = + if sub_data_hash.is_a?(Array) + build_relations( + sub_relation_key, + sub_relation_definition, + sub_data_hash).presence + else + build_relation( + sub_relation_key, + sub_relation_definition, + sub_data_hash) + end + + if current_item + data_hash[sub_relation_key] = current_item + else + data_hash.delete(sub_relation_key) + end + end + + def excluded_keys_for_relation(relation) + reader.attributes_finder.find_excluded_keys(relation) + end + + def cleaned_data_hash(data_hash, data_class) + ::Gitlab::ImportExport::AttributeCleaner.clean(relation_hash: data_hash, relation_class: data_class) + end + + def relation_class(relation_name) + relation_name.to_s.classify.constantize + rescue NameError + relation_name.to_s.constantize + end + + # {:parent=>{}, :award_emoji=>{}, :events=>{:push_event_payload=>{}}, :notes=>{:author=>{}, :award_emoji=>{}, :events=>{:push_event_payload=>{}}}} + def epic_relation_definition + @relation_definition ||= reader.attributes_finder.find_relations_tree(:group).deep_stringify_keys['epics'] + end + + def reader + @reader ||= ::Gitlab::ImportExport::Reader.new( shared: nil, config: group_config ) + end + + def group_config + @group_config ||= ::Gitlab::ImportExport::Config.new( config: ::Gitlab::ImportExport.group_config_file ).to_h + end + end + end + end + end +end diff --git a/ee/lib/ee/bulk_imports/importers/group_importer.rb b/ee/lib/ee/bulk_imports/importers/group_importer.rb index ad05e52f9afa62d77e295047dfd0adf91f97d275..ac1548f0f08eca490d22f0ba7af88f6103ac671b 100644 --- a/ee/lib/ee/bulk_imports/importers/group_importer.rb +++ b/ee/lib/ee/bulk_imports/importers/group_importer.rb @@ -11,10 +11,11 @@ module GroupImporter override :pipelines def pipelines super + [ - EE::BulkImports::Groups::Pipelines::EpicsPipeline, - EE::BulkImports::Groups::Pipelines::EpicAwardEmojiPipeline, - EE::BulkImports::Groups::Pipelines::EpicEventsPipeline, - EE::BulkImports::Groups::Pipelines::IterationsPipeline + EE::BulkImports::Groups::Pipelines::AnotherEpicsPipeline, + # EE::BulkImports::Groups::Pipelines::EpicsPipeline, + # EE::BulkImports::Groups::Pipelines::EpicAwardEmojiPipeline, + # EE::BulkImports::Groups::Pipelines::EpicEventsPipeline, + # EE::BulkImports::Groups::Pipelines::IterationsPipeline ] end end diff --git a/lib/api/group_export.rb b/lib/api/group_export.rb index 29ffbea687a3ba167f35570303242e4733fd8f8d..4df635c16dfd63ab824c87e5ba7f2648174008a6 100644 --- a/lib/api/group_export.rb +++ b/lib/api/group_export.rb @@ -43,6 +43,25 @@ class GroupExport < ::API::Base render_api_error!(message: 'Group export could not be started.') end end + + desc 'Export group relation' do + detail 'This feature was introduced in GitLab 13.11.' + end + params do + requires :relation, type: String, desc: 'Relation to export' + requires :url, type: String, desc: 'The URL to send exported relation to' + end + post ':id/export_relation' do + # validate URL here + # check rate limit here + + ::BulkImports::GroupRelationExportWorker.perform_async( + user_group.id, + current_user.id, + params[:relation], + params[:url] + ) + end end end end diff --git a/lib/api/group_import.rb b/lib/api/group_import.rb index 4a752732652da9ef1abf8c6139a3976189ceccdb..f65de60b4838114319fb3a4b0637bd828ff4fdc1 100644 --- a/lib/api/group_import.rb +++ b/lib/api/group_import.rb @@ -79,6 +79,35 @@ def closest_allowed_visibility_level render_api_error!("Failed to save group #{group.errors.messages}", 400) end end + + desc 'Import group relation' do + detail 'This feature was introduced in GitLab 13.11.' + end + content_type :ndjson, 'application/x-ndjson' + params do + requires :id, type: String, desc: 'The ID of a group' + requires :relation, type: String, desc: 'Type of relation to import' + requires :file, type: File, desc: 'Ndjson file to import' + end + post ':id/import_relation' do + + @group = Group.find(params['id']) + @shared = ::Gitlab::ImportExport::Shared.new(@group) + @tmp_folder = @shared.base_path + @file = params['file']['tempfile'] + @destination_filepath = File.join(@tmp_folder, params['file']['filename']) + + # move file to temp location + FileUtils.mkdir_p(@tmp_folder, mode: 0700) + FileUtils.chmod(0700, @tmp_folder) + FileUtils.copy_entry(@file.path, @destination_filepath) + + ::BulkImports::GroupRelationImportWorker.perform_async( + user_group.id, + params['relation'], + @destination_filepath + ) + end end end end diff --git a/lib/bulk_imports/clients/http.rb b/lib/bulk_imports/clients/http.rb index ef99122cdfde645c2b0a10b27e39722cb091c973..c69b7c3f1951ffb300ff4f80720d2b414479867d 100644 --- a/lib/bulk_imports/clients/http.rb +++ b/lib/bulk_imports/clients/http.rb @@ -28,6 +28,17 @@ def get(resource, query = {}) end end + def post(resource, body) + with_error_handling do + Gitlab::HTTP.post( + resource_url(resource), + headers: request_headers, + follow_redirects: false, + body: body.to_json + ) + end + end + def each_page(method, resource, query = {}, &block) return to_enum(__method__, method, resource, query) unless block_given? diff --git a/lib/bulk_imports/importers/group_importer.rb b/lib/bulk_imports/importers/group_importer.rb index 2cd97c687e070d693fccf874bf5c78a4040614d8..719bac769cd0eeecae2397b761f851af445497e5 100644 --- a/lib/bulk_imports/importers/group_importer.rb +++ b/lib/bulk_imports/importers/group_importer.rb @@ -31,11 +31,11 @@ def execute def pipelines [ BulkImports::Groups::Pipelines::GroupPipeline, - BulkImports::Groups::Pipelines::SubgroupEntitiesPipeline, - BulkImports::Groups::Pipelines::MembersPipeline, - BulkImports::Groups::Pipelines::LabelsPipeline, - BulkImports::Groups::Pipelines::MilestonesPipeline, - BulkImports::Groups::Pipelines::BadgesPipeline + # BulkImports::Groups::Pipelines::SubgroupEntitiesPipeline, + # BulkImports::Groups::Pipelines::MembersPipeline, + # BulkImports::Groups::Pipelines::LabelsPipeline, + # BulkImports::Groups::Pipelines::MilestonesPipeline, + # BulkImports::Groups::Pipelines::BadgesPipeline ] end end diff --git a/lib/bulk_imports/pipeline.rb b/lib/bulk_imports/pipeline.rb index df4f020d6b2f1126d0b0761c32b910ec9750f92b..6610bd97a487652f17118f2b5b9bfa7b8c2f1308 100644 --- a/lib/bulk_imports/pipeline.rb +++ b/lib/bulk_imports/pipeline.rb @@ -120,6 +120,14 @@ def instantiate(class_config) def abort_on_failure? self.class.abort_on_failure? end + + def ndjson_pipeline? + self.class.ndjson_pipeline? + end + + def relation + self.class.get_relation + end end class_methods do @@ -155,6 +163,22 @@ def abort_on_failure? class_attributes[:abort_on_failure] end + def relation(relation) + class_attributes[:relation] = relation + end + + def get_relation + class_attributes[:relation] + end + + def ndjson_pipeline! + class_attributes[:ndjson_pipeline] = true + end + + def ndjson_pipeline? + class_attributes[:ndjson_pipeline] + end + private def add_attribute(sym, klass, options) diff --git a/lib/bulk_imports/pipeline/runner.rb b/lib/bulk_imports/pipeline/runner.rb index b756fba3bee5466684c0fb3d75d6f3d5661dbeee..9925bebb3bc672829014ec0fe17ae3c6e4e1bed5 100644 --- a/lib/bulk_imports/pipeline/runner.rb +++ b/lib/bulk_imports/pipeline/runner.rb @@ -10,6 +10,12 @@ module Runner def run raise MarkedAsFailedError if context.entity.failed? + if ndjson_pipeline? && tracker.created? && respond_to?(:async_extract) + async_extract + tracker.data_request! + return + end + info(message: 'Pipeline started') extracted_data = extracted_data_from @@ -40,6 +46,8 @@ def run info(message: 'Pipeline finished') rescue MarkedAsFailedError skip!('Skipping pipeline due to failed entity') + rescue => e + info(message: e) end private # rubocop:disable Lint/UselessAccessModifier diff --git a/lib/gitlab/import_export/json/streaming_serializer.rb b/lib/gitlab/import_export/json/streaming_serializer.rb index 05b7679e0ff4222afe60c822c115066605ff2c1f..346fbb6b7737e1b1eadb1954bdeee59949b82a25 100644 --- a/lib/gitlab/import_export/json/streaming_serializer.rb +++ b/lib/gitlab/import_export/json/streaming_serializer.rb @@ -38,8 +38,6 @@ def execute end end - private - attr_reader :json_writer, :relations_schema, :exportable def serialize_root @@ -64,6 +62,8 @@ def serialize_relation(definition) end end + private + def serialize_many_relations(key, records, options) enumerator = Enumerator.new do |items| key_preloads = preloads&.dig(key)