diff --git a/app/models/import/source_user_placeholder_reference.rb b/app/models/import/source_user_placeholder_reference.rb index 66c6647e3d1386f1b793f4828889626c8740a901..9441336e48d84a47d3c6957ca66006bed4385c8e 100644 --- a/app/models/import/source_user_placeholder_reference.rb +++ b/app/models/import/source_user_placeholder_reference.rb @@ -2,6 +2,8 @@ module Import class SourceUserPlaceholderReference < ApplicationRecord + include BulkInsertSafe + self.table_name = 'import_source_user_placeholder_references' belongs_to :source_user, class_name: 'Import::SourceUser' @@ -16,6 +18,36 @@ class SourceUserPlaceholderReference < ApplicationRecord attribute :composite_key, :ind_jsonb + # If an element is ever added to this array, ensure that `.from_serialized` handles receiving + # older versions of the array by filling in missing values with defaults. We'd keep that in place + # for at least one release cycle to ensure backward compatibility. + SERIALIZABLE_ATTRIBUTES = %w[ + composite_key + model + namespace_id + numeric_key + source_user_id + user_reference_column + ].freeze + + SerializationError = Class.new(StandardError) + + def to_serialized + Gitlab::Json.dump(attributes.slice(*SERIALIZABLE_ATTRIBUTES).to_h.values) + end + + class << self + def from_serialized(serialized_reference) + deserialized = Gitlab::Json.parse(serialized_reference) + + raise SerializationError if deserialized.size != SERIALIZABLE_ATTRIBUTES.size + + attributes = SERIALIZABLE_ATTRIBUTES.zip(deserialized).to_h + + new(attributes.merge(created_at: Time.zone.now)) + end + end + private def validate_numeric_or_composite_key_present diff --git a/app/services/import/placeholder_references/base_service.rb b/app/services/import/placeholder_references/base_service.rb new file mode 100644 index 0000000000000000000000000000000000000000..2aa083d796a1eb65fa60d3abd34480dbadbd3321 --- /dev/null +++ b/app/services/import/placeholder_references/base_service.rb @@ -0,0 +1,46 @@ +# frozen_string_literal: true + +module Import + module PlaceholderReferences + class BaseService + include Services::ReturnServiceResponses + + def initialize(import_source:, import_uid:) + @import_source = import_source + @import_uid = import_uid + end + + private + + attr_reader :import_source, :import_uid, :reference + + def cache + Gitlab::Cache::Import::Caching + end + + def cache_key + @cache_key ||= [:'placeholder-reference', import_source, import_uid].join(':') + end + + def logger + Framework::Logger + end + + def log_info(...) + logger.info(logger_params(...)) + end + + def log_error(...) + logger.error(logger_params(...)) + end + + def logger_params(message:, **params) + params.merge( + message: message, + import_source: import_source, + import_uid: import_uid + ) + end + end + end +end diff --git a/app/services/import/placeholder_references/load_service.rb b/app/services/import/placeholder_references/load_service.rb new file mode 100644 index 0000000000000000000000000000000000000000..c18a155a62216eaa06ef820be8794e99060b306d --- /dev/null +++ b/app/services/import/placeholder_references/load_service.rb @@ -0,0 +1,99 @@ +# frozen_string_literal: true + +module Import + module PlaceholderReferences + class LoadService < BaseService + BATCH_LIMIT = 500 + + def initialize(import_source:, import_uid:) + super(import_source: import_source, import_uid: import_uid) + + @processed_count = 0 + @error_count = 0 + end + + def execute + log_info(message: 'Processing placeholder references') + + while (batch = next_batch).present? + load!(batch) + + # End this loop if we know that we cleared the set earlier. + # This prevents processing just a few records at a time if an import is simultaneously writing data to Redis. + break if batch.size < BATCH_LIMIT + end + + log_info( + message: 'Processed placeholder references', + processed_count: processed_count, + error_count: error_count + ) + + success(processed_count: processed_count, error_count: error_count) + end + + private + + attr_accessor :error_count, :processed_count + + def next_batch + cache.limited_values_from_set(cache_key, limit: BATCH_LIMIT) + end + + def load!(batch) + to_load = batch.filter_map do |item| + SourceUserPlaceholderReference.from_serialized(item) + rescue JSON::ParserError, SourceUserPlaceholderReference::SerializationError => e + log_error(item, e) + nil + end + + begin + bulk_insert!(to_load) + rescue ActiveRecord::RecordInvalid => e + # We optimise for all records being valid and only filter for validity + # when there was a problem + to_load.reject! do |item| + next false if item.valid? + + log_error(item.attributes, e) + true + end + + # Try again + bulk_insert!(to_load) + rescue ActiveRecord::InvalidForeignKey => e + # This is an unrecoverable situation where we allow the error to clear the batch + log_error(to_load, e) + end + + clear_batch!(batch) + end + + def bulk_insert!(to_load) + Import::SourceUserPlaceholderReference.bulk_insert!(to_load) + end + + def clear_batch!(batch) + processed_count = batch.size + + self.processed_count += processed_count + + cache.set_remove(cache_key, batch) + end + + def log_error(item, exception) + super( + message: 'Error processing placeholder reference', + item: item, + exception: { + class: exception.class, + message: exception.message + } + ) + + self.error_count += 1 + end + end + end +end diff --git a/app/services/import/placeholder_references/push_service.rb b/app/services/import/placeholder_references/push_service.rb new file mode 100644 index 0000000000000000000000000000000000000000..5f1c119c9a373a9c9d27d9fb99c4aa3d26c687ea --- /dev/null +++ b/app/services/import/placeholder_references/push_service.rb @@ -0,0 +1,55 @@ +# frozen_string_literal: true + +module Import + module PlaceholderReferences + class PushService < BaseService + class << self + def from_record(import_source:, import_uid:, source_user:, record:, user_reference_column:, composite_key: nil) + numeric_key = record.id if composite_key.nil? && record.id.is_a?(Integer) + + new( + import_source: import_source, + import_uid: import_uid, + model: record.class, + composite_key: composite_key, + numeric_key: numeric_key, + source_user_id: source_user.id, + source_user_namespace_id: source_user.namespace_id, + user_reference_column: user_reference_column + ) + end + end + + def initialize(import_source:, import_uid:, source_user_id:, source_user_namespace_id:, model:, user_reference_column:, numeric_key: nil, composite_key: nil) # rubocop:disable Layout/LineLength -- Its easier to read being on one line + super(import_source: import_source, import_uid: import_uid) + + @reference = Import::SourceUserPlaceholderReference.new( + model: model.name, + source_user_id: source_user_id, + namespace_id: source_user_namespace_id, + user_reference_column: user_reference_column, + numeric_key: numeric_key, + composite_key: composite_key + ) + end + + def execute + return error(reference.errors.full_messages, :bad_request) unless reference.valid? + + serialized_reference = reference.to_serialized + + cache.set_add(cache_key, serialized_reference, timeout: cache_ttl) + + success(serialized_reference: serialized_reference) + end + + private + + attr_reader :reference + + def cache_ttl + Gitlab::Cache::Import::Caching::TIMEOUT + end + end + end +end diff --git a/app/workers/all_queues.yml b/app/workers/all_queues.yml index 9bb4943747163f9dfd242b7aa82a182c1bba3cca..f32d5a3441309e17c971fdd9d2ff507f45c7356a 100644 --- a/app/workers/all_queues.yml +++ b/app/workers/all_queues.yml @@ -3225,6 +3225,15 @@ :weight: 2 :idempotent: true :tags: [] +- :name: import_load_placeholder_references + :worker_name: Import::LoadPlaceholderReferencesWorker + :feature_category: :importers + :has_external_dependencies: false + :urgency: :low + :resource_boundary: :unknown + :weight: 1 + :idempotent: true + :tags: [] - :name: import_refresh_import_jid :worker_name: Gitlab::Import::RefreshImportJidWorker :feature_category: :importers diff --git a/app/workers/import/load_placeholder_references_worker.rb b/app/workers/import/load_placeholder_references_worker.rb new file mode 100644 index 0000000000000000000000000000000000000000..fa6089181b0fed91144cac0a08e0a8ac47bb62c7 --- /dev/null +++ b/app/workers/import/load_placeholder_references_worker.rb @@ -0,0 +1,22 @@ +# frozen_string_literal: true + +module Import + class LoadPlaceholderReferencesWorker + include ApplicationWorker + + data_consistency :delayed + deduplicate :until_executed, if_deduplicated: :reschedule_once + idempotent! + feature_category :importers + loggable_arguments 0, 1 + + def perform(import_source, import_uid, params = {}) + return unless Feature.enabled?(:bulk_import_user_mapping, User.actor_from_id(params['current_user_id'])) + + ::Import::PlaceholderReferences::LoadService.new( + import_source: import_source, + import_uid: import_uid + ).execute + end + end +end diff --git a/config/sidekiq_queues.yml b/config/sidekiq_queues.yml index a1b5580af77544c5fe6265beb028f5eac3121f96..00a19602d8e0efe50e7f44834bd5f1322a010cd2 100644 --- a/config/sidekiq_queues.yml +++ b/config/sidekiq_queues.yml @@ -399,6 +399,8 @@ - 1 - - import_issues_csv - 2 +- - import_load_placeholder_references + - 1 - - import_refresh_import_jid - 1 - - incident_management diff --git a/lib/gitlab/cache/import/caching.rb b/lib/gitlab/cache/import/caching.rb index 0b24137076dd662d6962516e43c376e8823f90a5..beaf9eb666fd95a1a105dc93de6d2079f20f6225 100644 --- a/lib/gitlab/cache/import/caching.rb +++ b/lib/gitlab/cache/import/caching.rb @@ -154,6 +154,30 @@ def self.values_from_set(raw_key) end end + # Returns a limited number of random values from the set. + # + # raw_key - The key of the set to check. + # limit - Number of values to return (default: 1). + def self.limited_values_from_set(raw_key, limit: 1) + key = cache_key_for(raw_key) + + with_redis do |redis| + redis.srandmember(key, limit) + end + end + + # Removes the given values from the set. + # + # raw_key - The key of the set to check. + # values - Array of values to remove from set. + def self.set_remove(raw_key, values = []) + key = cache_key_for(raw_key) + + with_redis do |redis| + redis.srem(key, values) + end + end + # Sets multiple keys to given values. # # mapping - A Hash mapping the cache keys to their values. @@ -263,7 +287,7 @@ def self.hash_increment(raw_key, field, value, timeout: TIMEOUT) # raw_key - The key of the list to add to. # value - The field value to add to the list. # timeout - The new timeout of the key. - # limit - The maximum number of members in the set. Older members will be trimmed to this limit. + # limit - The maximum number of members in the list. Older members will be trimmed to this limit. def self.list_add(raw_key, value, timeout: TIMEOUT, limit: nil) validate_redis_value!(value) diff --git a/spec/lib/gitlab/cache/import/caching_spec.rb b/spec/lib/gitlab/cache/import/caching_spec.rb index 9bc24e2ef956b92ee623733979627b322a014793..948961805838f0d94116936dcdb481e3ebff9612 100644 --- a/spec/lib/gitlab/cache/import/caching_spec.rb +++ b/spec/lib/gitlab/cache/import/caching_spec.rb @@ -139,6 +139,60 @@ end end + describe '.limited_values_from_set' do + it 'returns empty array when the set does not exist' do + expect(described_class.limited_values_from_set('foo')).to eq([]) + end + + it 'returns a single random member from the set' do + described_class.set_add('foo', 10) + described_class.set_add('foo', 20) + + result = described_class.limited_values_from_set('foo') + + expect(result.size).to eq(1) + expect(result.first).to be_in(%w[10 20]) + end + + it 'returns multiple random members from the set with `limit:`' do + described_class.set_add('foo', 10) + described_class.set_add('foo', 20) + described_class.set_add('foo', 30) + + result = described_class.limited_values_from_set('foo', limit: 2) + + expect(result.size).to eq(2) + expect(result).to all(be_in(%w[10 20 30])) + end + end + + describe '.set_remove' do + it 'returns 0 when the set does not exist' do + expect(described_class.set_remove('foo', 1)).to eq(0) + end + + it 'removes a single value from the set' do + described_class.set_add('foo', 10) + described_class.set_add('foo', 20) + + result = described_class.set_remove('foo', 20) + + expect(result).to eq(1) + expect(described_class.values_from_set('foo')).to contain_exactly('10') + end + + it 'removes a collection of values from the set' do + described_class.set_add('foo', 10) + described_class.set_add('foo', 20) + described_class.set_add('foo', 30) + + result = described_class.set_remove('foo', [10, 30]) + + expect(result).to eq(2) + expect(described_class.values_from_set('foo')).to contain_exactly('20') + end + end + describe '.hash_add' do it 'adds a value to a hash' do described_class.hash_add('foo', 1, 1) @@ -286,7 +340,7 @@ end describe '.values_from_list' do - it 'returns empty hash when the list is empty' do + it 'returns empty array when the list is empty' do expect(described_class.values_from_list('foo')).to eq([]) end diff --git a/spec/models/import/source_user_placeholder_reference_spec.rb b/spec/models/import/source_user_placeholder_reference_spec.rb index e441e065f59d4f97fd0bc73a2517eabeb4c16195..f003014564a04fe7497065e11a48d79142f5162e 100644 --- a/spec/models/import/source_user_placeholder_reference_spec.rb +++ b/spec/models/import/source_user_placeholder_reference_spec.rb @@ -42,4 +42,78 @@ def validation_errors(...) expect { reference.source_user.destroy! }.to change { described_class.count }.by(-1) end + + describe 'SERIALIZABLE_ATTRIBUTES' do + subject(:constant) { described_class::SERIALIZABLE_ATTRIBUTES } + + it 'is the expected list of attribute names' do + expected_elements = %w[ + composite_key + model + namespace_id + numeric_key + source_user_id + user_reference_column + ] + + failure_message = <<-MSG + Before fixing this spec, ensure that `#{described_class.name}.from_serialized` + handles receiving older versions of the array by filling in missing values with defaults. + + You are seeing this message because SERIALIZABLE_ATTRIBUTES has changed. + MSG + + expect(constant).to eq(expected_elements), failure_message + end + end + + describe '#to_serialized' do + let(:reference) do + build(:import_source_user_placeholder_reference, + numeric_key: 1, + namespace_id: 2, + source_user_id: 3, + user_reference_column: 'foo', + model: 'Model', + composite_key: { key: 1 } + ) + end + + subject(:serialized) { reference.to_serialized } + + it { is_expected.to eq('[{"key":1},"Model",2,1,3,"foo"]') } + end + + describe '.from_serialized' do + subject(:from_serialized) { described_class.from_serialized(serialized) } + + context 'when serialized reference is valid' do + let(:serialized) { '[{"key":1},"Model",2,null,3,"foo"]' } + + it 'returns a valid SourceUserPlaceholderReference' do + expect(from_serialized).to be_a(described_class) + .and(be_valid) + .and(have_attributes( + composite_key: { key: 1 }, + model: 'Model', + numeric_key: nil, + namespace_id: 2, + source_user_id: 3, + user_reference_column: 'foo' + )) + end + + it 'sets the created_at' do + expect(from_serialized.created_at).to be_like_time(Time.zone.now) + end + end + + context 'when serialized reference has different number of elements than expected' do + let(:serialized) { '[{"key":1},"Model",2,null,3]' } + + it 'raises an exception' do + expect { from_serialized }.to raise_error(described_class::SerializationError) + end + end + end end diff --git a/spec/services/import/placeholder_references/load_service_spec.rb b/spec/services/import/placeholder_references/load_service_spec.rb new file mode 100644 index 0000000000000000000000000000000000000000..b12646aa4aebf7d0809a3e83ea9be12e35907aef --- /dev/null +++ b/spec/services/import/placeholder_references/load_service_spec.rb @@ -0,0 +1,280 @@ +# frozen_string_literal: true + +require 'spec_helper' + +RSpec.describe Import::PlaceholderReferences::LoadService, feature_category: :importers do + let(:import_source) { Import::SOURCE_DIRECT_TRANSFER } + let(:import_uid) { 1 } + + describe '#execute', :aggregate_failures, :clean_gitlab_redis_shared_state do + let_it_be(:source_user) { create(:import_source_user) } + let_it_be(:valid_reference) do + { + composite_key: nil, + numeric_key: 1, + model: 'Foo', + namespace_id: source_user.namespace_id, + source_user_id: source_user.id, + user_reference_column: 'user_id' + } + end + + let(:valid_reference_2) { valid_reference.merge(model: 'Bar') } + let(:valid_reference_3) { valid_reference.merge(model: 'Baz') } + + subject(:result) { described_class.new(import_source: import_source, import_uid: import_uid).execute } + + it 'loads data pushed with `Import::PlaceholderReferences::PushService`' do + record = create(:note) + + Import::PlaceholderReferences::PushService.from_record( + import_source: import_source, + import_uid: import_uid, + source_user: source_user, + record: record, + user_reference_column: :author_id + ).execute + + expect(Import::SourceUserPlaceholderReference) + .to receive(:bulk_insert!) + .with(kind_of(Array)) + .and_call_original + + expect_log_message(:info, message: 'Processing placeholder references') + expect_log_message(:info, message: 'Processed placeholder references', processed_count: 1, error_count: 0) + expect(result).to be_success + expect(result.payload).to eq(processed_count: 1, error_count: 0) + expect(Import::SourceUserPlaceholderReference.all).to contain_exactly( + have_attributes( + composite_key: nil, + numeric_key: record.id, + model: record.class.name, + namespace_id: source_user.namespace_id, + source_user_id: source_user.id, + user_reference_column: 'author_id' + ) + ) + expect(set_members_count).to eq(0) + end + + it 'loads data to PostgreSQL in batches' do + push(valid_reference) + push(valid_reference_2) + push(valid_reference_3) + + stub_const("#{described_class}::BATCH_LIMIT", 2) + + expect(Import::SourceUserPlaceholderReference) + .to receive(:bulk_insert!) + .with(kind_of(Array)) + .twice + .and_call_original + expect(Gitlab::Cache::Import::Caching) + .to receive(:limited_values_from_set) + .twice + .and_call_original + expect_log_message(:info, message: 'Processing placeholder references') + expect_log_message(:info, message: 'Processed placeholder references', processed_count: 3, error_count: 0) + expect(result).to be_success + expect(result.payload).to eq(processed_count: 3, error_count: 0) + expect(Import::SourceUserPlaceholderReference.all).to contain_exactly( + have_attributes(valid_reference), + have_attributes(valid_reference_2), + have_attributes(valid_reference_3) + ) + expect(set_members_count).to eq(0) + end + + it 'does not load data for another import_uid' do + push(valid_reference) + + result = described_class.new(import_source: import_source, import_uid: 2).execute + + expect(result).to be_success + expect(result.payload).to eq(processed_count: 0, error_count: 0) + expect(Import::SourceUserPlaceholderReference.count).to eq(0) + expect(set_members_count).to eq(1) + end + + it 'does not load data for another import_source' do + push(valid_reference) + + result = described_class.new( + import_source: ::Import::SOURCE_PROJECT_EXPORT_IMPORT, + import_uid: import_uid + ).execute + + expect(result).to be_success + expect(result.payload).to eq(processed_count: 0, error_count: 0) + expect(Import::SourceUserPlaceholderReference.count).to eq(0) + expect(set_members_count).to eq(1) + end + + context 'when something in the batch has an unexpected schema' do + let(:invalid_reference) { valid_reference.merge(foo: 'bar') } + + before do + stub_const("#{described_class}::BATCH_LIMIT", 2) + + push(valid_reference) + push(valid_reference_2) + # We can't use `#push` because that validates, + # so push directly to the set. + Gitlab::Cache::Import::Caching.set_add(cache_key, invalid_reference.values.to_json) + push(valid_reference_3) + end + + it 'loads just the valid data, and clears the set' do + expect_log_message(:error, + message: 'Error processing placeholder reference', + exception: { + class: Import::SourceUserPlaceholderReference::SerializationError, + message: 'Import::SourceUserPlaceholderReference::SerializationError' + }, + item: invalid_reference.values.to_json + ) + expect_log_message(:info, message: 'Processed placeholder references', processed_count: 4, error_count: 1) + expect(result).to be_success + expect(result.payload).to eq(processed_count: 4, error_count: 1) + expect(Import::SourceUserPlaceholderReference.all).to contain_exactly( + have_attributes(valid_reference), + have_attributes(valid_reference_2), + have_attributes(valid_reference_3) + ) + expect(set_members_count).to eq(0) + end + end + + context 'when loading to PostgreSQL fails due to an ActiveRecord::RecordInvalid' do + let(:invalid_reference) { valid_reference.except(:source_user_id) } + + before do + stub_const("#{described_class}::BATCH_LIMIT", 2) + + push(invalid_reference) + push(valid_reference) + push(valid_reference_2) + push(valid_reference_3) + end + + it 'loads just the valid data, and clears the list' do + expect_log_message(:error, + message: 'Error processing placeholder reference', + exception: { + class: ActiveRecord::RecordInvalid, + message: "Validation failed: Source user can't be blank" + }, + item: hash_including(invalid_reference.stringify_keys) + ) + expect(result).to be_success + expect(result.payload).to eq(processed_count: 4, error_count: 1) + expect(Import::SourceUserPlaceholderReference.all).to contain_exactly( + have_attributes(valid_reference), + have_attributes(valid_reference_2), + have_attributes(valid_reference_3) + ) + expect(set_members_count).to eq(0) + end + end + + context 'when loading to PostgreSQL fails due to ActiveRecord::InvalidForeignKey' do + let(:invalid_reference) { valid_reference.merge(source_user_id: non_existing_record_id) } + + before do + stub_const("#{described_class}::BATCH_LIMIT", 2) + + push(invalid_reference) + push(valid_reference) + push(valid_reference_2) + push(valid_reference_3) + end + + it 'logs the error and clears the failing batch but continues' do + expect_log_message(:error, + message: 'Error processing placeholder reference', + exception: { + class: ActiveRecord::InvalidForeignKey, + message: include('PG::ForeignKeyViolation') + }, + item: have_attributes(size: 2).and(include(have_attributes(invalid_reference))) + ) + expect(result).to be_success + expect(Import::SourceUserPlaceholderReference.count).to eq(2) + expect(set_members_count).to eq(0) + end + end + + context 'when loading to PostgreSQL fails for an unhandled reason' do + before do + allow(Import::SourceUserPlaceholderReference) + .to receive(:bulk_insert!) + .and_raise(ActiveRecord::ConnectionTimeoutError) + end + + it 'bubbles the exception and does not clear the set' do + push(valid_reference) + + expect { result }.to raise_error(ActiveRecord::ConnectionTimeoutError) + expect(Import::SourceUserPlaceholderReference.count).to eq(0) + expect(set_members_count).to eq(1) + end + end + + context 'when fetching set from Redis fails' do + before do + allow(Gitlab::Cache::Import::Caching) + .to receive(:limited_values_from_set) + .and_raise(Redis::ConnectionError) + end + + it 'bubbles the exception, does not load any data, and does not clear the set' do + push(valid_reference) + + expect { result }.to raise_error(Redis::ConnectionError) + expect(Import::SourceUserPlaceholderReference.count).to eq(0) + expect(set_members_count).to eq(1) + end + end + + context 'when clearing the set from Redis fails' do + before do + allow(Gitlab::Cache::Import::Caching) + .to receive(:set_remove) + .and_raise(Redis::ConnectionError) + end + + it 'bubbles the exception and does not clear the set, but does load the data' do + push(valid_reference) + + expect { result }.to raise_error(Redis::ConnectionError) + # (We tolerate that this leads to duplicate records loaded to PostgreSQL!) + expect(Import::SourceUserPlaceholderReference.count).to eq(1) + expect(set_members_count).to eq(1) + end + end + + def push(reference) + serialized = Import::SourceUserPlaceholderReference.new(reference).to_serialized + Gitlab::Cache::Import::Caching.set_add(cache_key, serialized) + end + + def expect_log_message(type, **params) + allow(Import::Framework::Logger).to receive(type) + expect(Import::Framework::Logger).to receive(type) + .with(params.merge(import_source: import_source, import_uid: import_uid)) + end + + def set_members_count + Gitlab::Cache::Import::Caching.with_redis do |redis| + redis.scard(Gitlab::Cache::Import::Caching.cache_key_for(cache_key)) + end + end + + def cache_key + Import::PlaceholderReferences::BaseService.new( + import_source: import_source, + import_uid: import_uid + ).send(:cache_key) + end + end +end diff --git a/spec/services/import/placeholder_references/push_service_spec.rb b/spec/services/import/placeholder_references/push_service_spec.rb new file mode 100644 index 0000000000000000000000000000000000000000..b6de3af433779310bd1a1989e84d764fc5f91559 --- /dev/null +++ b/spec/services/import/placeholder_references/push_service_spec.rb @@ -0,0 +1,93 @@ +# frozen_string_literal: true + +require 'spec_helper' + +RSpec.describe Import::PlaceholderReferences::PushService, :aggregate_failures, :clean_gitlab_redis_shared_state, feature_category: :importers do + let(:import_source) { Import::SOURCE_DIRECT_TRANSFER } + let(:import_uid) { 1 } + + describe '#execute' do + let(:composite_key) { nil } + let(:numeric_key) { 9 } + + subject(:result) do + described_class.new( + import_source: import_source, + import_uid: import_uid, + source_user_id: 123, + source_user_namespace_id: 234, + model: MergeRequest, + user_reference_column: 'author_id', + numeric_key: numeric_key, + composite_key: composite_key + ).execute + end + + it 'pushes data to Redis' do + expected_result = [nil, 'MergeRequest', 234, 9, 123, 'author_id'].to_json + + expect(result).to be_success + expect(result.payload).to eq(serialized_reference: expected_result) + expect(set).to contain_exactly(expected_result) + end + + context 'when composite_key is provided' do + let(:numeric_key) { nil } + let(:composite_key) { { 'foo' => 1 } } + + it 'pushes data to Redis containing the composite_key' do + expected_result = [ + { 'foo' => 1 }, 'MergeRequest', 234, nil, 123, 'author_id' + ].to_json + + expect(result).to be_success + expect(result.payload).to eq(serialized_reference: expected_result) + expect(set).to contain_exactly(expected_result) + end + end + + context 'when is invalid' do + let(:composite_key) { { 'foo' => 1 } } + + it 'does not push data to Redis' do + expect(result).to be_error + expect(result.message).to include('numeric_key or composite_key must be present') + expect(set).to be_empty + end + end + end + + describe '.from_record' do + let_it_be(:source_user) { create(:import_source_user) } + let_it_be(:record) { create(:merge_request) } + + subject(:result) do + described_class.from_record( + import_source: import_source, + import_uid: import_uid, + source_user: source_user, + record: record, + user_reference_column: 'author_id' + ).execute + end + + it 'pushes data to Redis' do + expected_result = [nil, 'MergeRequest', source_user.namespace_id, record.id, source_user.id, 'author_id'].to_json + + expect(result).to be_success + expect(result.payload).to eq(serialized_reference: expected_result) + expect(set).to contain_exactly(expected_result) + end + end + + def set + Gitlab::Cache::Import::Caching.values_from_set(cache_key) + end + + def cache_key + Import::PlaceholderReferences::BaseService.new( + import_source: import_source, + import_uid: import_uid + ).send(:cache_key) + end +end diff --git a/spec/workers/import/load_placeholder_references_worker_spec.rb b/spec/workers/import/load_placeholder_references_worker_spec.rb new file mode 100644 index 0000000000000000000000000000000000000000..92b53cd97778b41e9e1b3ed301cc0a8ef2789b6f --- /dev/null +++ b/spec/workers/import/load_placeholder_references_worker_spec.rb @@ -0,0 +1,52 @@ +# frozen_string_literal: true + +require 'spec_helper' + +RSpec.describe Import::LoadPlaceholderReferencesWorker, feature_category: :importers do + let(:user) { create(:user) } + let(:import_source) { 'test_source' } + let(:uid) { 123 } + let(:params) { { 'current_user_id' => user.id } } + + describe '#perform' do + subject(:perform) { described_class.new.perform(import_source, uid, params) } + + it 'executes LoadService' do + expect_next_instance_of(Import::PlaceholderReferences::LoadService) do |service| + expect(service).to receive(:execute) + end + + perform + end + + it_behaves_like 'an idempotent worker' do + let(:job_args) { [import_source, uid, params] } + end + + context 'when bulk_import_user_mapping feature is disabled' do + before do + stub_feature_flags(bulk_import_user_mapping: false) + end + + it 'does not execute LoadService' do + expect(Import::PlaceholderReferences::LoadService).not_to receive(:new) + + perform + end + + context 'when bulk_import_user_mapping feature is enabled for the user' do + before do + stub_feature_flags(bulk_import_user_mapping: user) + end + + it 'executes LoadService' do + expect_next_instance_of(Import::PlaceholderReferences::LoadService) do |service| + expect(service).to receive(:execute) + end + + perform + end + end + end + end +end