From 085656155784ee7b8c8dc4d62174fef5626e102c Mon Sep 17 00:00:00 2001 From: Luke Duncalfe Date: Wed, 19 Jun 2024 13:13:33 +1200 Subject: [PATCH 1/8] Load placeholder references from Redis into PG MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit https://gitlab.com/gitlab-org/gitlab/-/issues/443554 introduced a single table to contain all details of placeholder user contributions. We will record 1 row of data per imported record that is associated with a user, for every importer. Our table design has the benefit of being simple, but attempting to write each row to the table seems a bit naïve and is likely to lead to scalability problems at times. Failures to write this data to PostgreSQL will lead to a kind of data loss for customers. This change writes the placeholder user contribution data to Redis first which is later collected and batch loaded to the PostgreSQL table. https://gitlab.com/gitlab-org/gitlab/-/issues/467511 --- .../source_user_placeholder_reference.rb | 32 +++ .../placeholder_references/base_service.rb | 46 ++++ .../placeholder_references/load_service.rb | 100 +++++++ .../placeholder_references/push_service.rb | 65 +++++ app/workers/all_queues.yml | 9 + .../load_placeholder_contributions_worker.rb | 27 ++ config/sidekiq_queues.yml | 2 + lib/gitlab/cache/import/caching.rb | 28 +- spec/lib/gitlab/cache/import/caching_spec.rb | 30 ++- .../source_user_placeholder_reference_spec.rb | 74 ++++++ .../load_service_spec.rb | 245 ++++++++++++++++++ .../push_service_spec.rb | 71 +++++ 12 files changed, 723 insertions(+), 6 deletions(-) create mode 100644 app/services/import/placeholder_references/base_service.rb create mode 100644 app/services/import/placeholder_references/load_service.rb create mode 100644 app/services/import/placeholder_references/push_service.rb create mode 100644 app/workers/gitlab/import/load_placeholder_contributions_worker.rb create mode 100644 spec/services/import/placeholder_references/load_service_spec.rb create mode 100644 spec/services/import/placeholder_references/push_service_spec.rb diff --git a/app/models/import/source_user_placeholder_reference.rb b/app/models/import/source_user_placeholder_reference.rb index 66c6647e3d1386..5a866c87d4929c 100644 --- a/app/models/import/source_user_placeholder_reference.rb +++ b/app/models/import/source_user_placeholder_reference.rb @@ -2,6 +2,8 @@ module Import class SourceUserPlaceholderReference < ApplicationRecord + include BulkInsertSafe + self.table_name = 'import_source_user_placeholder_references' belongs_to :source_user, class_name: 'Import::SourceUser' @@ -16,6 +18,36 @@ class SourceUserPlaceholderReference < ApplicationRecord attribute :composite_key, :ind_jsonb + # If an element is ever added to this array, ensure that `.from_compressed` handles receiving + # older versions of the array by filling in missing values with defaults. We'd keep that in place + # for at least one release cycle to ensure backward compatibility. + COMPRESSABLE_ATTRIBUTES = %w[ + composite_key + model + namespace_id + numeric_key + source_user_id + user_reference_column + ].freeze + + DecompressionError = Class.new(StandardError) + + def to_compressed + Gitlab::Json.dump(attributes.slice(*COMPRESSABLE_ATTRIBUTES).to_h.values) + end + + class << self + def from_compressed(compressed_reference) + uncompressed = Gitlab::Json.parse(compressed_reference) + + raise DecompressionError if uncompressed.size != COMPRESSABLE_ATTRIBUTES.size + + attributes = COMPRESSABLE_ATTRIBUTES.zip(uncompressed).to_h + + new(attributes.merge(created_at: Time.zone.now)) + end + end + private def validate_numeric_or_composite_key_present diff --git a/app/services/import/placeholder_references/base_service.rb b/app/services/import/placeholder_references/base_service.rb new file mode 100644 index 00000000000000..2aa083d796a1eb --- /dev/null +++ b/app/services/import/placeholder_references/base_service.rb @@ -0,0 +1,46 @@ +# frozen_string_literal: true + +module Import + module PlaceholderReferences + class BaseService + include Services::ReturnServiceResponses + + def initialize(import_source:, import_uid:) + @import_source = import_source + @import_uid = import_uid + end + + private + + attr_reader :import_source, :import_uid, :reference + + def cache + Gitlab::Cache::Import::Caching + end + + def cache_key + @cache_key ||= [:'placeholder-reference', import_source, import_uid].join(':') + end + + def logger + Framework::Logger + end + + def log_info(...) + logger.info(logger_params(...)) + end + + def log_error(...) + logger.error(logger_params(...)) + end + + def logger_params(message:, **params) + params.merge( + message: message, + import_source: import_source, + import_uid: import_uid + ) + end + end + end +end diff --git a/app/services/import/placeholder_references/load_service.rb b/app/services/import/placeholder_references/load_service.rb new file mode 100644 index 00000000000000..d3501c45e660fc --- /dev/null +++ b/app/services/import/placeholder_references/load_service.rb @@ -0,0 +1,100 @@ +# frozen_string_literal: true + +module Import + module PlaceholderReferences + class LoadService < BaseService + extend ::Gitlab::Utils::Override + + BATCH_LIMIT = 500 + + def initialize(import_source:, import_uid:) + super(import_source: import_source, import_uid: import_uid) + + @processed_count = 0 + @error_count = 0 + end + + def execute + loop do + batch = next_batch + break if batch.empty? + + load!(batch) + + break if batch.size < BATCH_LIMIT + end + + log_info( + message: 'Processed placeholder references', + processed_count: processed_count, + error_count: error_count + ) + + success(processed_count: processed_count, error_count: error_count) + end + + private + + attr_accessor :error_count, :processed_count + + def next_batch + # Maybe have a sanity check where we log and delete a batch if we think we've seen it a few times + # Just so we can't get stuck in a forever loop + # + # take a checksum of the array, use it to increment a count to say how many times we've seen it + cache.values_from_list(cache_key, stop: BATCH_LIMIT - 1) + end + + def load!(batch) + to_load = batch.filter_map do |item| + SourceUserPlaceholderReference.from_compressed(item) + rescue JSON::ParserError, SourceUserPlaceholderReference::DecompressionError => e + log_error(item, e) + nil + end + + begin + bulk_insert!(to_load) + rescue ActiveRecord::RecordInvalid => e # What other kinds of errors? + # We optimise for all records being valid and only filter for validity + # when there was a problem + to_load.reject! do |item| + next false if item.valid? + + log_error(item.attributes, e) + true + end + + # Try again + bulk_insert!(to_load) + end + + clear_batch!(batch) + end + + def bulk_insert!(to_load) + Import::SourceUserPlaceholderReference.bulk_insert!(to_load, batch_size: to_load.size) + end + + def clear_batch!(batch) + processed_count = batch.size + + self.processed_count += processed_count + + cache.list_pop(cache_key, limit: processed_count) + end + + override :log_error + def log_error(item, exception) + super( + message: 'Error processing placeholder reference', + item: item, + exception_class: exception.class, + exception_message: exception.message + ) + + self.error_count += 1 + end + end + end +end diff --git a/app/services/import/placeholder_references/push_service.rb b/app/services/import/placeholder_references/push_service.rb new file mode 100644 index 00000000000000..e8da5a91b37a26 --- /dev/null +++ b/app/services/import/placeholder_references/push_service.rb @@ -0,0 +1,65 @@ +# frozen_string_literal: true + +module Import + module PlaceholderReferences + class PushService < BaseService + # @param import_source [String, Symbol] + # @param import_uid [String, Integer] An ID that is unique when scoped to the `import_source`. + # Used to generate a cache key. + # @param source_user [Import::SourceUser] + # @param record [ApplicationRecord] Record that has a placeholder user reference. + # @param user_reference_column [String, Symbol] Name of column on record which has the placeholder user reference. + # Example: 'author_id'. + # @param composite_key [Hash] Optional, for records that do not have a primary key. Provides arguments for a + # where clause to look up the record by. + # Example: { issue_id: 1, user_id: 2 }. + def initialize(import_source:, import_uid:, source_user:, record:, user_reference_column:, composite_key: nil) # rubocop:disable Layout/LineLength -- It's easier to read the arguments when they're on a single line + super(import_source: import_source, import_uid: import_uid) + + @source_user = source_user + @record = record + @user_reference_column = user_reference_column + @composite_key = composite_key + @model = record.class + + @reference = build_reference + end + + def execute + return error(reference.errors.full_messages, :bad_request) unless reference.valid? + + compressed_reference = reference.to_compressed + + cache.list_add(cache_key, compressed_reference, timeout: cache_ttl) + + success(compressed_reference: compressed_reference) + end + + private + + attr_reader :composite_key, :model, :record, :reference, :source_user, :user_reference_column + attr_accessor :numeric_key + + def cache_ttl + Gitlab::Cache::Import::Caching::TIMEOUT + end + + def build_reference + primary_key_column = model.primary_key + + if primary_key_column && composite_key.nil? + numeric_key = record.send(primary_key_column) # rubocop:disable GitlabSecurity/PublicSend -- I want to + end + + Import::SourceUserPlaceholderReference.new( + model: model.name, + source_user_id: source_user.id, + namespace_id: source_user.namespace_id, + user_reference_column: user_reference_column, + numeric_key: numeric_key, + composite_key: composite_key + ) + end + end + end +end diff --git a/app/workers/all_queues.yml b/app/workers/all_queues.yml index 9bb4943747163f..78ed6468411439 100644 --- a/app/workers/all_queues.yml +++ b/app/workers/all_queues.yml @@ -3225,6 +3225,15 @@ :weight: 2 :idempotent: true :tags: [] +- :name: import_load_placeholder_contributions + :worker_name: Gitlab::Import::LoadPlaceholderContributionsWorker + :feature_category: :importers + :has_external_dependencies: false + :urgency: :low + :resource_boundary: :unknown + :weight: 1 + :idempotent: true + :tags: [] - :name: import_refresh_import_jid :worker_name: Gitlab::Import::RefreshImportJidWorker :feature_category: :importers diff --git a/app/workers/gitlab/import/load_placeholder_contributions_worker.rb b/app/workers/gitlab/import/load_placeholder_contributions_worker.rb new file mode 100644 index 00000000000000..1e24343a3fb7b5 --- /dev/null +++ b/app/workers/gitlab/import/load_placeholder_contributions_worker.rb @@ -0,0 +1,27 @@ +# frozen_string_literal: true + +module Gitlab + module Import + class LoadPlaceholderContributionsWorker + include ApplicationWorker + + data_consistency :delayed + idempotent! # TODO check args make this idempotent + + feature_category :importers + sidekiq_options dead: false # TODO maybe we want this? + # TODO check other sidekiq options + # TODO queue? + + # sidekiq_options retry: 5 + + # import_source - TODO + # uid - TODO + # params - to avoid multiple releases if parameters change + def perform(import_source, uid, _params = {}) + # TODO add metadata logging + ::Import::PlaceholderReferences::LoadService.new(import_source: import_source, uid: uid).execute + end + end + end +end diff --git a/config/sidekiq_queues.yml b/config/sidekiq_queues.yml index a1b5580af77544..1aaf0851eccc1c 100644 --- a/config/sidekiq_queues.yml +++ b/config/sidekiq_queues.yml @@ -399,6 +399,8 @@ - 1 - - import_issues_csv - 2 +- - import_load_placeholder_contributions + - 1 - - import_refresh_import_jid - 1 - - incident_management diff --git a/lib/gitlab/cache/import/caching.rb b/lib/gitlab/cache/import/caching.rb index 0b24137076dd66..3880845d35f629 100644 --- a/lib/gitlab/cache/import/caching.rb +++ b/lib/gitlab/cache/import/caching.rb @@ -258,12 +258,12 @@ def self.hash_increment(raw_key, field, value, timeout: TIMEOUT) end end - # Adds a value to a list. + # Adds a value to the end of a list. # # raw_key - The key of the list to add to. # value - The field value to add to the list. # timeout - The new timeout of the key. - # limit - The maximum number of members in the set. Older members will be trimmed to this limit. + # limit - The maximum number of members in the list. Older members will be trimmed to this limit. def self.list_add(raw_key, value, timeout: TIMEOUT, limit: nil) validate_redis_value!(value) @@ -278,14 +278,32 @@ def self.list_add(raw_key, value, timeout: TIMEOUT, limit: nil) end end - # Returns the values of the given list. + # Returns the values in a list. + # The offsets start and stop are zero-based indexes, with 0 being the first element of the list + # (the front of the list), 1 being the next element and so on. # # raw_key - The key of the list. - def self.values_from_list(raw_key) + # start - The zero-based index of the list to start at (default: 0 the front of the list). + # stop - The zero-based index of the list to stop at (default: -1 the end of the list). + # Note that values_from_list(start: 0, stop: 10) will return 11 elements. + def self.values_from_list(raw_key, start: 0, stop: -1) key = cache_key_for(raw_key) with_redis do |redis| - redis.lrange(key, 0, -1) + redis.lrange(key, start, stop) + end + end + + # Pops from front of list and returns an array of values. + # Will return nil if list is empty. + # + # raw_key - The key of the list to pop from. + # limit - The number of values to pop from the list (default: 1). + def self.list_pop(raw_key, limit: 1) + key = cache_key_for(raw_key) + + with_redis do |redis| + redis.lpop(key, limit) end end diff --git a/spec/lib/gitlab/cache/import/caching_spec.rb b/spec/lib/gitlab/cache/import/caching_spec.rb index 9bc24e2ef956b9..f8c5ef063dba44 100644 --- a/spec/lib/gitlab/cache/import/caching_spec.rb +++ b/spec/lib/gitlab/cache/import/caching_spec.rb @@ -286,7 +286,7 @@ end describe '.values_from_list' do - it 'returns empty hash when the list is empty' do + it 'returns empty array when the list is empty' do expect(described_class.values_from_list('foo')).to eq([]) end @@ -297,6 +297,34 @@ expect(described_class.values_from_list('foo')).to eq(%w[10 20 10]) end + + it 'returns the items between two indexes with `start` and `stop` args' do + described_class.list_add('foo', 1) + described_class.list_add('foo', 2) + described_class.list_add('foo', 3) + + expect(described_class.values_from_list('foo', start: 1)).to eq(%w[2 3]) + expect(described_class.values_from_list('foo', start: 1, stop: 1)).to eq(%w[2]) + expect(described_class.values_from_list('foo', stop: 1)).to eq(%w[1 2]) + end + end + + describe '.list_pop' do + it 'pops the first item stored in the list as an array' do + described_class.list_add('foo', 1) + described_class.list_add('foo', 2) + + expect(described_class.list_pop('foo')).to eq(['1']) + expect(described_class.list_pop('foo')).to eq(['2']) + expect(described_class.list_pop('foo')).to be_nil + end + + it 'pops a number of items at once with the `limit` arg' do + described_class.list_add('foo', 1) + described_class.list_add('foo', 2) + + expect(described_class.list_pop('foo', limit: 2)).to eq(%w[1 2]) + end end describe '.del' do diff --git a/spec/models/import/source_user_placeholder_reference_spec.rb b/spec/models/import/source_user_placeholder_reference_spec.rb index e441e065f59d4f..8fc60b358d2394 100644 --- a/spec/models/import/source_user_placeholder_reference_spec.rb +++ b/spec/models/import/source_user_placeholder_reference_spec.rb @@ -42,4 +42,78 @@ def validation_errors(...) expect { reference.source_user.destroy! }.to change { described_class.count }.by(-1) end + + describe 'COMPRESSABLE_ATTRIBUTES' do + subject(:constant) { described_class::COMPRESSABLE_ATTRIBUTES } + + it 'is the expected list' do + expected_elements = %w[ + composite_key + model + namespace_id + numeric_key + source_user_id + user_reference_column + ] + + failure_message = <<-MSG + Before fixing this spec, ensure that `#{described_class.name}.from_compressed` + handles receiving older versions of the array by filling in missing values with defaults. + + You are seeing this message because COMPRESSABLE_ATTRIBUTES has changed. + MSG + + expect(constant).to eq(expected_elements), failure_message + end + end + + describe '#to_compressed' do + let(:reference) do + build(:import_source_user_placeholder_reference, + numeric_key: 1, + namespace_id: 2, + source_user_id: 3, + user_reference_column: 'foo', + model: 'Model', + composite_key: { key: 1 } + ) + end + + subject(:compressed) { reference.to_compressed } + + it { is_expected.to eq('[{"key":1},"Model",2,1,3,"foo"]') } + end + + describe '.from_compressed' do + subject(:from_compressed) { described_class.from_compressed(compressed) } + + context 'when compressed reference is valid' do + let(:compressed) { '[{"key":1},"Model",2,null,3,"foo"]' } + + it 'returns a valid SourceUserPlaceholderReference' do + expect(from_compressed).to be_a(described_class) + .and(be_valid) + .and(have_attributes( + composite_key: { key: 1 }, + model: 'Model', + numeric_key: nil, + namespace_id: 2, + source_user_id: 3, + user_reference_column: 'foo' + )) + end + + it 'sets the created_at' do + expect(from_compressed.created_at).to be_like_time(Time.zone.now) + end + end + + context 'when compressed reference has different number of elements than expected' do + let(:compressed) { '[{"key":1},"Model",2,null,3]' } + + it 'raises an exception' do + expect { from_compressed }.to raise_error(described_class::DecompressionError) + end + end + end end diff --git a/spec/services/import/placeholder_references/load_service_spec.rb b/spec/services/import/placeholder_references/load_service_spec.rb new file mode 100644 index 00000000000000..1ccca2bd8d2cbf --- /dev/null +++ b/spec/services/import/placeholder_references/load_service_spec.rb @@ -0,0 +1,245 @@ +# frozen_string_literal: true + +require 'spec_helper' + +RSpec.describe Import::PlaceholderReferences::LoadService, feature_category: :importers do + let(:import_source) { Import::SOURCE_DIRECT_TRANSFER } + let(:import_uid) { 1 } + + describe '#execute', :aggregate_failures, :clean_gitlab_redis_shared_state do + let_it_be(:source_user) { create(:import_source_user) } + let_it_be(:valid_reference) do + { + composite_key: nil, + numeric_key: 1, + model: 'Foo', + namespace_id: source_user.namespace_id, + source_user_id: source_user.id, + user_reference_column: 'user_id' + } + end + + let(:valid_reference_2) { valid_reference.merge(model: 'Bar') } + let(:valid_reference_3) { valid_reference.merge(model: 'Baz') } + + subject(:result) { described_class.new(import_source: import_source, import_uid: import_uid).execute } + + it 'loads data pushed with `Import::PlaceholderReferences::PushService`' do + record = create(:note) + + Import::PlaceholderReferences::PushService.new( + import_source: import_source, + import_uid: import_uid, + source_user: source_user, + record: record, + user_reference_column: :author_id + ).execute + + expect(Import::SourceUserPlaceholderReference) + .to receive(:bulk_insert!) + .with(kind_of(Array), batch_size: 1) + .and_call_original + + expect_log_message(:info, message: 'Processed placeholder references', processed_count: 1, error_count: 0) + expect(result).to be_success + expect(result.payload).to eq(processed_count: 1, error_count: 0) + expect(Import::SourceUserPlaceholderReference.all).to contain_exactly( + have_attributes({ + composite_key: nil, + numeric_key: record.id, + model: record.class.name, + namespace_id: source_user.namespace_id, + source_user_id: source_user.id, + user_reference_column: 'author_id' + }) + ) + expect(cache_list_length).to eq(0) + end + + it 'loads data to PostgreSQL in batches' do + push(valid_reference) + push(valid_reference_2) + push(valid_reference_3) + + stub_const("#{described_class}::BATCH_LIMIT", 2) + + expect(Import::SourceUserPlaceholderReference) + .to receive(:bulk_insert!) + .with(kind_of(Array), batch_size: be_between(1, 2)) + .twice + .and_call_original + + expect_log_message(:info, message: 'Processed placeholder references', processed_count: 3, error_count: 0) + expect(result).to be_success + expect(result.payload).to eq(processed_count: 3, error_count: 0) + expect(Import::SourceUserPlaceholderReference.all).to contain_exactly( + have_attributes(valid_reference), + have_attributes(valid_reference_2), + have_attributes(valid_reference_3) + ) + expect(cache_list_length).to eq(0) + end + + it 'does not load data for another import_uid' do + push(valid_reference) + + result = described_class.new(import_source: import_source, import_uid: 2).execute + + expect(result).to be_success + expect(result.payload).to eq(processed_count: 0, error_count: 0) + expect(Import::SourceUserPlaceholderReference.count).to eq(0) + expect(cache_list_length).to eq(1) + end + + it 'does not load data for another import_source' do + push(valid_reference) + + result = described_class.new( + import_source: ::Import::SOURCE_PROJECT_EXPORT_IMPORT, + import_uid: import_uid + ).execute + + expect(result).to be_success + expect(result.payload).to eq(processed_count: 0, error_count: 0) + expect(Import::SourceUserPlaceholderReference.count).to eq(0) + expect(cache_list_length).to eq(1) + end + + context 'when something in the batch does not validate' do + let(:invalid_reference) { valid_reference.except(:source_user_id) } + + before do + stub_const("#{described_class}::BATCH_LIMIT", 2) + + push(valid_reference) + push(invalid_reference) + push(valid_reference_2) + push(valid_reference_3) + end + + it 'loads just the valid data, and clears the list' do + expect_log_message(:error, + message: 'Error processing placeholder reference', + exception_class: ActiveRecord::RecordInvalid, + exception_message: "Validation failed: Source user can't be blank", + item: hash_including(invalid_reference.stringify_keys) + ) + expect_log_message(:info, message: 'Processed placeholder references', processed_count: 4, error_count: 1) + expect(result).to be_success + expect(result.payload).to eq(processed_count: 4, error_count: 1) + expect(Import::SourceUserPlaceholderReference.all).to contain_exactly( + have_attributes(valid_reference), + have_attributes(valid_reference_2), + have_attributes(valid_reference_3) + ) + expect(cache_list_length).to eq(0) + end + end + + context 'when something in the batch has an unexpected schema' do + let(:invalid_reference) { valid_reference.merge(foo: 'bar') } + + before do + stub_const("#{described_class}::BATCH_LIMIT", 2) + + push(valid_reference) + push(valid_reference_2) + # We can't use `#push` because that validates, + # so push directly to the list. + Gitlab::Cache::Import::Caching.list_add(cache_key, invalid_reference.values.to_json) + push(valid_reference_3) + end + + it 'loads just the valid data, and clears the list' do + expect_log_message(:error, + message: 'Error processing placeholder reference', + exception_class: Import::SourceUserPlaceholderReference::DecompressionError, + exception_message: 'Import::SourceUserPlaceholderReference::DecompressionError', + item: invalid_reference.values.to_json + ) + expect_log_message(:info, message: 'Processed placeholder references', processed_count: 4, error_count: 1) + expect(result).to be_success + expect(result.payload).to eq(processed_count: 4, error_count: 1) + expect(Import::SourceUserPlaceholderReference.all).to contain_exactly( + have_attributes(valid_reference), + have_attributes(valid_reference_2), + have_attributes(valid_reference_3) + ) + expect(cache_list_length).to eq(0) + end + end + + context 'when loading to PostgreSQL fails' do + before do + allow(Import::SourceUserPlaceholderReference) + .to receive(:bulk_insert!) + .and_raise(ActiveRecord::ConnectionTimeoutError) + end + + it 'bubbles the exception and does not clear the list' do + push(valid_reference) + + expect { result }.to raise_error(ActiveRecord::ConnectionTimeoutError) + expect(Import::SourceUserPlaceholderReference.count).to eq(0) + expect(cache_list_length).to eq(1) + end + end + + context 'when fetching list from Redis fails' do + before do + allow(Gitlab::Cache::Import::Caching) + .to receive(:values_from_list) + .and_raise(Redis::ConnectionError) + end + + it 'bubbles the exception, does not load any data, and does not clear the list' do + push(valid_reference) + + expect { result }.to raise_error(Redis::ConnectionError) + expect(Import::SourceUserPlaceholderReference.count).to eq(0) + expect(cache_list_length).to eq(1) + end + end + + context 'when popping the list from Redis fails' do + before do + allow(Gitlab::Cache::Import::Caching) + .to receive(:list_pop).once + .and_raise(Redis::ConnectionError) + end + + it 'bubbles the exception and does not clear the list, but does load the data' do + push(valid_reference) + + expect { result }.to raise_error(Redis::ConnectionError) + # (We tolerate that this leads to duplicate records!) + expect(Import::SourceUserPlaceholderReference.count).to eq(1) + expect(cache_list_length).to eq(1) + end + end + + def push(reference) + compressed = Import::SourceUserPlaceholderReference.new(reference).to_compressed + Gitlab::Cache::Import::Caching.list_add(cache_key, compressed) + end + + def expect_log_message(type, **params) + expect(Import::Framework::Logger).to receive(type) + .with(params.merge(import_source: import_source, import_uid: import_uid)) + .and_call_original + end + + def cache_list_length + Gitlab::Cache::Import::Caching.with_redis do |redis| + redis.llen(Gitlab::Cache::Import::Caching.cache_key_for(cache_key)) + end + end + + def cache_key + Import::PlaceholderReferences::BaseService.new( + import_source: import_source, + import_uid: import_uid + ).send(:cache_key) + end + end +end diff --git a/spec/services/import/placeholder_references/push_service_spec.rb b/spec/services/import/placeholder_references/push_service_spec.rb new file mode 100644 index 00000000000000..20526d7df51cb9 --- /dev/null +++ b/spec/services/import/placeholder_references/push_service_spec.rb @@ -0,0 +1,71 @@ +# frozen_string_literal: true + +require 'spec_helper' + +RSpec.describe Import::PlaceholderReferences::PushService, feature_category: :importers do + let(:import_source) { Import::SOURCE_DIRECT_TRANSFER } + let(:import_uid) { 1 } + let(:composite_key) { nil } + + let_it_be(:source_user) { create(:import_source_user) } + let_it_be(:record) { create(:merge_request) } + + describe '#execute', :aggregate_failures, :clean_gitlab_redis_shared_state do + subject(:result) do + described_class.new( + import_source: import_source, + import_uid: import_uid, + source_user: source_user, + record: record, + user_reference_column: 'author_id', + composite_key: composite_key + ).execute + end + + it 'pushes data to Redis' do + expected_result = [nil, 'MergeRequest', source_user.namespace_id, record.id, source_user.id, 'author_id'].to_json + + expect(result).to be_success + expect(result.payload).to eq(compressed_reference: expected_result) + expect(cache_list).to contain_exactly(expected_result) + end + + context 'when is invalid' do + before do + invalid_reference = Import::SourceUserPlaceholderReference.new + allow(Import::SourceUserPlaceholderReference).to receive(:new).and_return(invalid_reference) + end + + it 'does not push data to Redis' do + expect(result).to be_error + expect(result.message).to include("Model can't be blank") + expect(cache_list).to be_empty + end + end + + context 'when composite_key is provided' do + let(:composite_key) { { 'foo' => 1 } } + + it 'pushes data to Redis containing the composite_key, and not the numeric_key' do + expected_result = [ + { 'foo' => 1 }, 'MergeRequest', source_user.namespace_id, nil, source_user.id, 'author_id' + ].to_json + + expect(result).to be_success + expect(result.payload).to eq(compressed_reference: expected_result) + expect(cache_list).to contain_exactly(expected_result) + end + end + + def cache_list + Gitlab::Cache::Import::Caching.values_from_list(cache_key) + end + + def cache_key + Import::PlaceholderReferences::BaseService.new( + import_source: import_source, + import_uid: import_uid + ).send(:cache_key) + end + end +end -- GitLab From b8cbdfe1f5f4fdae8004ae1405e2e024789b32f1 Mon Sep 17 00:00:00 2001 From: Luke Duncalfe Date: Thu, 4 Jul 2024 17:18:15 +1200 Subject: [PATCH 2/8] Use sets instead of lists --- .../placeholder_references/load_service.rb | 4 +- .../placeholder_references/push_service.rb | 65 ++++++--------- lib/gitlab/cache/import/caching.rb | 50 ++++++----- spec/lib/gitlab/cache/import/caching_spec.rb | 82 ++++++++++++------- .../source_user_placeholder_reference_spec.rb | 2 +- .../load_service_spec.rb | 46 +++++------ .../push_service_spec.rb | 76 ++++++++++------- 7 files changed, 183 insertions(+), 142 deletions(-) diff --git a/app/services/import/placeholder_references/load_service.rb b/app/services/import/placeholder_references/load_service.rb index d3501c45e660fc..0e4ffa2d3ec165 100644 --- a/app/services/import/placeholder_references/load_service.rb +++ b/app/services/import/placeholder_references/load_service.rb @@ -42,7 +42,7 @@ def next_batch # Just so we can't get stuck in a forever loop # # take a checksum of the array, use it to increment a count to say how many times we've seen it - cache.values_from_list(cache_key, stop: BATCH_LIMIT - 1) + cache.limited_values_from_set(cache_key, limit: BATCH_LIMIT) end def load!(batch) @@ -81,7 +81,7 @@ def clear_batch!(batch) self.processed_count += processed_count - cache.list_pop(cache_key, limit: processed_count) + cache.set_remove(cache_key, batch) end override :log_error diff --git a/app/services/import/placeholder_references/push_service.rb b/app/services/import/placeholder_references/push_service.rb index e8da5a91b37a26..7bb9483303cdc8 100644 --- a/app/services/import/placeholder_references/push_service.rb +++ b/app/services/import/placeholder_references/push_service.rb @@ -3,26 +3,33 @@ module Import module PlaceholderReferences class PushService < BaseService - # @param import_source [String, Symbol] - # @param import_uid [String, Integer] An ID that is unique when scoped to the `import_source`. - # Used to generate a cache key. - # @param source_user [Import::SourceUser] - # @param record [ApplicationRecord] Record that has a placeholder user reference. - # @param user_reference_column [String, Symbol] Name of column on record which has the placeholder user reference. - # Example: 'author_id'. - # @param composite_key [Hash] Optional, for records that do not have a primary key. Provides arguments for a - # where clause to look up the record by. - # Example: { issue_id: 1, user_id: 2 }. - def initialize(import_source:, import_uid:, source_user:, record:, user_reference_column:, composite_key: nil) # rubocop:disable Layout/LineLength -- It's easier to read the arguments when they're on a single line - super(import_source: import_source, import_uid: import_uid) + class << self + def from_record(import_source:, import_uid:, source_user:, record:, user_reference_column:, composite_key: nil) + numeric_key = record.id if composite_key.nil? && record.id.is_a?(Integer) + + new( + import_source: import_source, + import_uid: import_uid, + source_user: source_user, + model: record.class, + user_reference_column: user_reference_column, + numeric_key: numeric_key, + composite_key: composite_key + ) + end + end - @source_user = source_user - @record = record - @user_reference_column = user_reference_column - @composite_key = composite_key - @model = record.class + def initialize(import_source:, import_uid:, source_user:, model:, user_reference_column:, numeric_key: nil, composite_key: nil) # rubocop:disable Layout/LineLength -- Easier to read on one line + super(import_source: import_source, import_uid: import_uid) - @reference = build_reference + @reference = Import::SourceUserPlaceholderReference.new( + model: model.name, + source_user_id: source_user.id, + namespace_id: source_user.namespace_id, + user_reference_column: user_reference_column, + numeric_key: numeric_key, + composite_key: composite_key + ) end def execute @@ -30,36 +37,18 @@ def execute compressed_reference = reference.to_compressed - cache.list_add(cache_key, compressed_reference, timeout: cache_ttl) + cache.set_add(cache_key, compressed_reference, timeout: cache_ttl) success(compressed_reference: compressed_reference) end private - attr_reader :composite_key, :model, :record, :reference, :source_user, :user_reference_column - attr_accessor :numeric_key + attr_reader :reference def cache_ttl Gitlab::Cache::Import::Caching::TIMEOUT end - - def build_reference - primary_key_column = model.primary_key - - if primary_key_column && composite_key.nil? - numeric_key = record.send(primary_key_column) # rubocop:disable GitlabSecurity/PublicSend -- I want to - end - - Import::SourceUserPlaceholderReference.new( - model: model.name, - source_user_id: source_user.id, - namespace_id: source_user.namespace_id, - user_reference_column: user_reference_column, - numeric_key: numeric_key, - composite_key: composite_key - ) - end end end end diff --git a/lib/gitlab/cache/import/caching.rb b/lib/gitlab/cache/import/caching.rb index 3880845d35f629..beaf9eb666fd95 100644 --- a/lib/gitlab/cache/import/caching.rb +++ b/lib/gitlab/cache/import/caching.rb @@ -154,6 +154,30 @@ def self.values_from_set(raw_key) end end + # Returns a limited number of random values from the set. + # + # raw_key - The key of the set to check. + # limit - Number of values to return (default: 1). + def self.limited_values_from_set(raw_key, limit: 1) + key = cache_key_for(raw_key) + + with_redis do |redis| + redis.srandmember(key, limit) + end + end + + # Removes the given values from the set. + # + # raw_key - The key of the set to check. + # values - Array of values to remove from set. + def self.set_remove(raw_key, values = []) + key = cache_key_for(raw_key) + + with_redis do |redis| + redis.srem(key, values) + end + end + # Sets multiple keys to given values. # # mapping - A Hash mapping the cache keys to their values. @@ -258,7 +282,7 @@ def self.hash_increment(raw_key, field, value, timeout: TIMEOUT) end end - # Adds a value to the end of a list. + # Adds a value to a list. # # raw_key - The key of the list to add to. # value - The field value to add to the list. @@ -278,32 +302,14 @@ def self.list_add(raw_key, value, timeout: TIMEOUT, limit: nil) end end - # Returns the values in a list. - # The offsets start and stop are zero-based indexes, with 0 being the first element of the list - # (the front of the list), 1 being the next element and so on. + # Returns the values of the given list. # # raw_key - The key of the list. - # start - The zero-based index of the list to start at (default: 0 the front of the list). - # stop - The zero-based index of the list to stop at (default: -1 the end of the list). - # Note that values_from_list(start: 0, stop: 10) will return 11 elements. - def self.values_from_list(raw_key, start: 0, stop: -1) - key = cache_key_for(raw_key) - - with_redis do |redis| - redis.lrange(key, start, stop) - end - end - - # Pops from front of list and returns an array of values. - # Will return nil if list is empty. - # - # raw_key - The key of the list to pop from. - # limit - The number of values to pop from the list (default: 1). - def self.list_pop(raw_key, limit: 1) + def self.values_from_list(raw_key) key = cache_key_for(raw_key) with_redis do |redis| - redis.lpop(key, limit) + redis.lrange(key, 0, -1) end end diff --git a/spec/lib/gitlab/cache/import/caching_spec.rb b/spec/lib/gitlab/cache/import/caching_spec.rb index f8c5ef063dba44..948961805838f0 100644 --- a/spec/lib/gitlab/cache/import/caching_spec.rb +++ b/spec/lib/gitlab/cache/import/caching_spec.rb @@ -139,6 +139,60 @@ end end + describe '.limited_values_from_set' do + it 'returns empty array when the set does not exist' do + expect(described_class.limited_values_from_set('foo')).to eq([]) + end + + it 'returns a single random member from the set' do + described_class.set_add('foo', 10) + described_class.set_add('foo', 20) + + result = described_class.limited_values_from_set('foo') + + expect(result.size).to eq(1) + expect(result.first).to be_in(%w[10 20]) + end + + it 'returns multiple random members from the set with `limit:`' do + described_class.set_add('foo', 10) + described_class.set_add('foo', 20) + described_class.set_add('foo', 30) + + result = described_class.limited_values_from_set('foo', limit: 2) + + expect(result.size).to eq(2) + expect(result).to all(be_in(%w[10 20 30])) + end + end + + describe '.set_remove' do + it 'returns 0 when the set does not exist' do + expect(described_class.set_remove('foo', 1)).to eq(0) + end + + it 'removes a single value from the set' do + described_class.set_add('foo', 10) + described_class.set_add('foo', 20) + + result = described_class.set_remove('foo', 20) + + expect(result).to eq(1) + expect(described_class.values_from_set('foo')).to contain_exactly('10') + end + + it 'removes a collection of values from the set' do + described_class.set_add('foo', 10) + described_class.set_add('foo', 20) + described_class.set_add('foo', 30) + + result = described_class.set_remove('foo', [10, 30]) + + expect(result).to eq(2) + expect(described_class.values_from_set('foo')).to contain_exactly('20') + end + end + describe '.hash_add' do it 'adds a value to a hash' do described_class.hash_add('foo', 1, 1) @@ -297,34 +351,6 @@ expect(described_class.values_from_list('foo')).to eq(%w[10 20 10]) end - - it 'returns the items between two indexes with `start` and `stop` args' do - described_class.list_add('foo', 1) - described_class.list_add('foo', 2) - described_class.list_add('foo', 3) - - expect(described_class.values_from_list('foo', start: 1)).to eq(%w[2 3]) - expect(described_class.values_from_list('foo', start: 1, stop: 1)).to eq(%w[2]) - expect(described_class.values_from_list('foo', stop: 1)).to eq(%w[1 2]) - end - end - - describe '.list_pop' do - it 'pops the first item stored in the list as an array' do - described_class.list_add('foo', 1) - described_class.list_add('foo', 2) - - expect(described_class.list_pop('foo')).to eq(['1']) - expect(described_class.list_pop('foo')).to eq(['2']) - expect(described_class.list_pop('foo')).to be_nil - end - - it 'pops a number of items at once with the `limit` arg' do - described_class.list_add('foo', 1) - described_class.list_add('foo', 2) - - expect(described_class.list_pop('foo', limit: 2)).to eq(%w[1 2]) - end end describe '.del' do diff --git a/spec/models/import/source_user_placeholder_reference_spec.rb b/spec/models/import/source_user_placeholder_reference_spec.rb index 8fc60b358d2394..b9abb566141eba 100644 --- a/spec/models/import/source_user_placeholder_reference_spec.rb +++ b/spec/models/import/source_user_placeholder_reference_spec.rb @@ -46,7 +46,7 @@ def validation_errors(...) describe 'COMPRESSABLE_ATTRIBUTES' do subject(:constant) { described_class::COMPRESSABLE_ATTRIBUTES } - it 'is the expected list' do + it 'is the expected list of attribute names' do expected_elements = %w[ composite_key model diff --git a/spec/services/import/placeholder_references/load_service_spec.rb b/spec/services/import/placeholder_references/load_service_spec.rb index 1ccca2bd8d2cbf..fb1fcc2ca07b40 100644 --- a/spec/services/import/placeholder_references/load_service_spec.rb +++ b/spec/services/import/placeholder_references/load_service_spec.rb @@ -27,7 +27,7 @@ it 'loads data pushed with `Import::PlaceholderReferences::PushService`' do record = create(:note) - Import::PlaceholderReferences::PushService.new( + Import::PlaceholderReferences::PushService.from_record( import_source: import_source, import_uid: import_uid, source_user: source_user, @@ -53,7 +53,7 @@ user_reference_column: 'author_id' }) ) - expect(cache_list_length).to eq(0) + expect(set_members_count).to eq(0) end it 'loads data to PostgreSQL in batches' do @@ -77,7 +77,7 @@ have_attributes(valid_reference_2), have_attributes(valid_reference_3) ) - expect(cache_list_length).to eq(0) + expect(set_members_count).to eq(0) end it 'does not load data for another import_uid' do @@ -88,7 +88,7 @@ expect(result).to be_success expect(result.payload).to eq(processed_count: 0, error_count: 0) expect(Import::SourceUserPlaceholderReference.count).to eq(0) - expect(cache_list_length).to eq(1) + expect(set_members_count).to eq(1) end it 'does not load data for another import_source' do @@ -102,7 +102,7 @@ expect(result).to be_success expect(result.payload).to eq(processed_count: 0, error_count: 0) expect(Import::SourceUserPlaceholderReference.count).to eq(0) - expect(cache_list_length).to eq(1) + expect(set_members_count).to eq(1) end context 'when something in the batch does not validate' do @@ -132,7 +132,7 @@ have_attributes(valid_reference_2), have_attributes(valid_reference_3) ) - expect(cache_list_length).to eq(0) + expect(set_members_count).to eq(0) end end @@ -145,12 +145,12 @@ push(valid_reference) push(valid_reference_2) # We can't use `#push` because that validates, - # so push directly to the list. - Gitlab::Cache::Import::Caching.list_add(cache_key, invalid_reference.values.to_json) + # so push directly to the set. + Gitlab::Cache::Import::Caching.set_add(cache_key, invalid_reference.values.to_json) push(valid_reference_3) end - it 'loads just the valid data, and clears the list' do + it 'loads just the valid data, and clears the set' do expect_log_message(:error, message: 'Error processing placeholder reference', exception_class: Import::SourceUserPlaceholderReference::DecompressionError, @@ -165,7 +165,7 @@ have_attributes(valid_reference_2), have_attributes(valid_reference_3) ) - expect(cache_list_length).to eq(0) + expect(set_members_count).to eq(0) end end @@ -176,51 +176,51 @@ .and_raise(ActiveRecord::ConnectionTimeoutError) end - it 'bubbles the exception and does not clear the list' do + it 'bubbles the exception and does not clear the set' do push(valid_reference) expect { result }.to raise_error(ActiveRecord::ConnectionTimeoutError) expect(Import::SourceUserPlaceholderReference.count).to eq(0) - expect(cache_list_length).to eq(1) + expect(set_members_count).to eq(1) end end - context 'when fetching list from Redis fails' do + context 'when fetching set from Redis fails' do before do allow(Gitlab::Cache::Import::Caching) - .to receive(:values_from_list) + .to receive(:limited_values_from_set) .and_raise(Redis::ConnectionError) end - it 'bubbles the exception, does not load any data, and does not clear the list' do + it 'bubbles the exception, does not load any data, and does not clear the set' do push(valid_reference) expect { result }.to raise_error(Redis::ConnectionError) expect(Import::SourceUserPlaceholderReference.count).to eq(0) - expect(cache_list_length).to eq(1) + expect(set_members_count).to eq(1) end end - context 'when popping the list from Redis fails' do + context 'when clearing the set from Redis fails' do before do allow(Gitlab::Cache::Import::Caching) - .to receive(:list_pop).once + .to receive(:set_remove) .and_raise(Redis::ConnectionError) end - it 'bubbles the exception and does not clear the list, but does load the data' do + it 'bubbles the exception and does not clear the set, but does load the data' do push(valid_reference) expect { result }.to raise_error(Redis::ConnectionError) # (We tolerate that this leads to duplicate records!) expect(Import::SourceUserPlaceholderReference.count).to eq(1) - expect(cache_list_length).to eq(1) + expect(set_members_count).to eq(1) end end def push(reference) compressed = Import::SourceUserPlaceholderReference.new(reference).to_compressed - Gitlab::Cache::Import::Caching.list_add(cache_key, compressed) + Gitlab::Cache::Import::Caching.set_add(cache_key, compressed) end def expect_log_message(type, **params) @@ -229,9 +229,9 @@ def expect_log_message(type, **params) .and_call_original end - def cache_list_length + def set_members_count Gitlab::Cache::Import::Caching.with_redis do |redis| - redis.llen(Gitlab::Cache::Import::Caching.cache_key_for(cache_key)) + redis.scard(Gitlab::Cache::Import::Caching.cache_key_for(cache_key)) end end diff --git a/spec/services/import/placeholder_references/push_service_spec.rb b/spec/services/import/placeholder_references/push_service_spec.rb index 20526d7df51cb9..38895f84c1cd60 100644 --- a/spec/services/import/placeholder_references/push_service_spec.rb +++ b/spec/services/import/placeholder_references/push_service_spec.rb @@ -2,22 +2,24 @@ require 'spec_helper' -RSpec.describe Import::PlaceholderReferences::PushService, feature_category: :importers do +RSpec.describe Import::PlaceholderReferences::PushService, :aggregate_failures, :clean_gitlab_redis_shared_state, feature_category: :importers do + let_it_be(:source_user) { create(:import_source_user) } + let_it_be(:record) { create(:merge_request) } + let(:import_source) { Import::SOURCE_DIRECT_TRANSFER } let(:import_uid) { 1 } let(:composite_key) { nil } + let(:numeric_key) { record.id } - let_it_be(:source_user) { create(:import_source_user) } - let_it_be(:record) { create(:merge_request) } - - describe '#execute', :aggregate_failures, :clean_gitlab_redis_shared_state do + describe '#execute' do subject(:result) do described_class.new( import_source: import_source, import_uid: import_uid, source_user: source_user, - record: record, + model: record.class, user_reference_column: 'author_id', + numeric_key: numeric_key, composite_key: composite_key ).execute end @@ -27,45 +29,63 @@ expect(result).to be_success expect(result.payload).to eq(compressed_reference: expected_result) - expect(cache_list).to contain_exactly(expected_result) - end - - context 'when is invalid' do - before do - invalid_reference = Import::SourceUserPlaceholderReference.new - allow(Import::SourceUserPlaceholderReference).to receive(:new).and_return(invalid_reference) - end - - it 'does not push data to Redis' do - expect(result).to be_error - expect(result.message).to include("Model can't be blank") - expect(cache_list).to be_empty - end + expect(set).to contain_exactly(expected_result) end context 'when composite_key is provided' do + let(:numeric_key) { nil } let(:composite_key) { { 'foo' => 1 } } - it 'pushes data to Redis containing the composite_key, and not the numeric_key' do + it 'pushes data to Redis containing the composite_key' do expected_result = [ { 'foo' => 1 }, 'MergeRequest', source_user.namespace_id, nil, source_user.id, 'author_id' ].to_json expect(result).to be_success expect(result.payload).to eq(compressed_reference: expected_result) - expect(cache_list).to contain_exactly(expected_result) + expect(set).to contain_exactly(expected_result) end end - def cache_list - Gitlab::Cache::Import::Caching.values_from_list(cache_key) + context 'when is invalid' do + let(:composite_key) { { 'foo' => 1 } } + + it 'does not push data to Redis' do + expect(result).to be_error + expect(result.message).to include('numeric_key or composite_key must be present') + expect(set).to be_empty + end end + end - def cache_key - Import::PlaceholderReferences::BaseService.new( + describe '.from_record' do + subject(:result) do + described_class.from_record( import_source: import_source, - import_uid: import_uid - ).send(:cache_key) + import_uid: import_uid, + source_user: source_user, + record: record, + user_reference_column: 'author_id' + ).execute end + + it 'pushes data to Redis' do + expected_result = [nil, 'MergeRequest', source_user.namespace_id, record.id, source_user.id, 'author_id'].to_json + + expect(result).to be_success + expect(result.payload).to eq(compressed_reference: expected_result) + expect(set).to contain_exactly(expected_result) + end + end + + def set + Gitlab::Cache::Import::Caching.values_from_set(cache_key) + end + + def cache_key + Import::PlaceholderReferences::BaseService.new( + import_source: import_source, + import_uid: import_uid + ).send(:cache_key) end end -- GitLab From cc94306c973792af516bf33318675067d21127fb Mon Sep 17 00:00:00 2001 From: Luke Duncalfe Date: Fri, 5 Jul 2024 15:15:41 +1200 Subject: [PATCH 3/8] Finishing off the worker --- .../placeholder_references/load_service.rb | 9 +--- .../placeholder_references/push_service.rb | 13 ++--- app/workers/all_queues.yml | 4 +- .../load_placeholder_contributions_worker.rb | 27 ---------- .../load_placeholder_references_worker.rb | 22 ++++++++ config/sidekiq_queues.yml | 2 +- .../load_service_spec.rb | 8 ++- .../push_service_spec.rb | 20 +++---- ...load_placeholder_references_worker_spec.rb | 52 +++++++++++++++++++ 9 files changed, 103 insertions(+), 54 deletions(-) delete mode 100644 app/workers/gitlab/import/load_placeholder_contributions_worker.rb create mode 100644 app/workers/import/load_placeholder_references_worker.rb create mode 100644 spec/workers/import/load_placeholder_references_worker_spec.rb diff --git a/app/services/import/placeholder_references/load_service.rb b/app/services/import/placeholder_references/load_service.rb index 0e4ffa2d3ec165..1375b3fca99efd 100644 --- a/app/services/import/placeholder_references/load_service.rb +++ b/app/services/import/placeholder_references/load_service.rb @@ -3,8 +3,6 @@ module Import module PlaceholderReferences class LoadService < BaseService - extend ::Gitlab::Utils::Override - BATCH_LIMIT = 500 def initialize(import_source:, import_uid:) @@ -15,6 +13,8 @@ def initialize(import_source:, import_uid:) end def execute + log_info(message: 'Processing placeholder references') + loop do batch = next_batch break if batch.empty? @@ -38,10 +38,6 @@ def execute attr_accessor :error_count, :processed_count def next_batch - # Maybe have a sanity check where we log and delete a batch if we think we've seen it a few times - # Just so we can't get stuck in a forever loop - # - # take a checksum of the array, use it to increment a count to say how many times we've seen it cache.limited_values_from_set(cache_key, limit: BATCH_LIMIT) end @@ -84,7 +80,6 @@ def clear_batch!(batch) cache.set_remove(cache_key, batch) end - override :log_error def log_error(item, exception) super( message: 'Error processing placeholder reference', diff --git a/app/services/import/placeholder_references/push_service.rb b/app/services/import/placeholder_references/push_service.rb index 7bb9483303cdc8..84705f27183744 100644 --- a/app/services/import/placeholder_references/push_service.rb +++ b/app/services/import/placeholder_references/push_service.rb @@ -10,22 +10,23 @@ def from_record(import_source:, import_uid:, source_user:, record:, user_referen new( import_source: import_source, import_uid: import_uid, - source_user: source_user, model: record.class, - user_reference_column: user_reference_column, + composite_key: composite_key, numeric_key: numeric_key, - composite_key: composite_key + source_user_id: source_user.id, + source_user_namespace_id: source_user.namespace_id, + user_reference_column: user_reference_column ) end end - def initialize(import_source:, import_uid:, source_user:, model:, user_reference_column:, numeric_key: nil, composite_key: nil) # rubocop:disable Layout/LineLength -- Easier to read on one line + def initialize(import_source:, import_uid:, source_user_id:, source_user_namespace_id:, model:, user_reference_column:, numeric_key: nil, composite_key: nil) # rubocop:disable Layout/LineLength -- Its easier to read being on one line super(import_source: import_source, import_uid: import_uid) @reference = Import::SourceUserPlaceholderReference.new( model: model.name, - source_user_id: source_user.id, - namespace_id: source_user.namespace_id, + source_user_id: source_user_id, + namespace_id: source_user_namespace_id, user_reference_column: user_reference_column, numeric_key: numeric_key, composite_key: composite_key diff --git a/app/workers/all_queues.yml b/app/workers/all_queues.yml index 78ed6468411439..f32d5a3441309e 100644 --- a/app/workers/all_queues.yml +++ b/app/workers/all_queues.yml @@ -3225,8 +3225,8 @@ :weight: 2 :idempotent: true :tags: [] -- :name: import_load_placeholder_contributions - :worker_name: Gitlab::Import::LoadPlaceholderContributionsWorker +- :name: import_load_placeholder_references + :worker_name: Import::LoadPlaceholderReferencesWorker :feature_category: :importers :has_external_dependencies: false :urgency: :low diff --git a/app/workers/gitlab/import/load_placeholder_contributions_worker.rb b/app/workers/gitlab/import/load_placeholder_contributions_worker.rb deleted file mode 100644 index 1e24343a3fb7b5..00000000000000 --- a/app/workers/gitlab/import/load_placeholder_contributions_worker.rb +++ /dev/null @@ -1,27 +0,0 @@ -# frozen_string_literal: true - -module Gitlab - module Import - class LoadPlaceholderContributionsWorker - include ApplicationWorker - - data_consistency :delayed - idempotent! # TODO check args make this idempotent - - feature_category :importers - sidekiq_options dead: false # TODO maybe we want this? - # TODO check other sidekiq options - # TODO queue? - - # sidekiq_options retry: 5 - - # import_source - TODO - # uid - TODO - # params - to avoid multiple releases if parameters change - def perform(import_source, uid, _params = {}) - # TODO add metadata logging - ::Import::PlaceholderReferences::LoadService.new(import_source: import_source, uid: uid).execute - end - end - end -end diff --git a/app/workers/import/load_placeholder_references_worker.rb b/app/workers/import/load_placeholder_references_worker.rb new file mode 100644 index 00000000000000..fa6089181b0fed --- /dev/null +++ b/app/workers/import/load_placeholder_references_worker.rb @@ -0,0 +1,22 @@ +# frozen_string_literal: true + +module Import + class LoadPlaceholderReferencesWorker + include ApplicationWorker + + data_consistency :delayed + deduplicate :until_executed, if_deduplicated: :reschedule_once + idempotent! + feature_category :importers + loggable_arguments 0, 1 + + def perform(import_source, import_uid, params = {}) + return unless Feature.enabled?(:bulk_import_user_mapping, User.actor_from_id(params['current_user_id'])) + + ::Import::PlaceholderReferences::LoadService.new( + import_source: import_source, + import_uid: import_uid + ).execute + end + end +end diff --git a/config/sidekiq_queues.yml b/config/sidekiq_queues.yml index 1aaf0851eccc1c..00a19602d8e0ef 100644 --- a/config/sidekiq_queues.yml +++ b/config/sidekiq_queues.yml @@ -399,7 +399,7 @@ - 1 - - import_issues_csv - 2 -- - import_load_placeholder_contributions +- - import_load_placeholder_references - 1 - - import_refresh_import_jid - 1 diff --git a/spec/services/import/placeholder_references/load_service_spec.rb b/spec/services/import/placeholder_references/load_service_spec.rb index fb1fcc2ca07b40..03dcca87be955d 100644 --- a/spec/services/import/placeholder_references/load_service_spec.rb +++ b/spec/services/import/placeholder_references/load_service_spec.rb @@ -40,18 +40,19 @@ .with(kind_of(Array), batch_size: 1) .and_call_original + expect_log_message(:info, message: 'Processing placeholder references') expect_log_message(:info, message: 'Processed placeholder references', processed_count: 1, error_count: 0) expect(result).to be_success expect(result.payload).to eq(processed_count: 1, error_count: 0) expect(Import::SourceUserPlaceholderReference.all).to contain_exactly( - have_attributes({ + have_attributes( composite_key: nil, numeric_key: record.id, model: record.class.name, namespace_id: source_user.namespace_id, source_user_id: source_user.id, user_reference_column: 'author_id' - }) + ) ) expect(set_members_count).to eq(0) end @@ -69,6 +70,7 @@ .twice .and_call_original + expect_log_message(:info, message: 'Processing placeholder references') expect_log_message(:info, message: 'Processed placeholder references', processed_count: 3, error_count: 0) expect(result).to be_success expect(result.payload).to eq(processed_count: 3, error_count: 0) @@ -118,6 +120,7 @@ end it 'loads just the valid data, and clears the list' do + expect_log_message(:info, message: 'Processing placeholder references') expect_log_message(:error, message: 'Error processing placeholder reference', exception_class: ActiveRecord::RecordInvalid, @@ -151,6 +154,7 @@ end it 'loads just the valid data, and clears the set' do + expect_log_message(:info, message: 'Processing placeholder references') expect_log_message(:error, message: 'Error processing placeholder reference', exception_class: Import::SourceUserPlaceholderReference::DecompressionError, diff --git a/spec/services/import/placeholder_references/push_service_spec.rb b/spec/services/import/placeholder_references/push_service_spec.rb index 38895f84c1cd60..72b8946eec649e 100644 --- a/spec/services/import/placeholder_references/push_service_spec.rb +++ b/spec/services/import/placeholder_references/push_service_spec.rb @@ -3,21 +3,20 @@ require 'spec_helper' RSpec.describe Import::PlaceholderReferences::PushService, :aggregate_failures, :clean_gitlab_redis_shared_state, feature_category: :importers do - let_it_be(:source_user) { create(:import_source_user) } - let_it_be(:record) { create(:merge_request) } - let(:import_source) { Import::SOURCE_DIRECT_TRANSFER } let(:import_uid) { 1 } - let(:composite_key) { nil } - let(:numeric_key) { record.id } describe '#execute' do + let(:composite_key) { nil } + let(:numeric_key) { 9 } + subject(:result) do described_class.new( import_source: import_source, import_uid: import_uid, - source_user: source_user, - model: record.class, + source_user_id: 123, + source_user_namespace_id: 234, + model: MergeRequest, user_reference_column: 'author_id', numeric_key: numeric_key, composite_key: composite_key @@ -25,7 +24,7 @@ end it 'pushes data to Redis' do - expected_result = [nil, 'MergeRequest', source_user.namespace_id, record.id, source_user.id, 'author_id'].to_json + expected_result = [nil, 'MergeRequest', 234, 9, 123, 'author_id'].to_json expect(result).to be_success expect(result.payload).to eq(compressed_reference: expected_result) @@ -38,7 +37,7 @@ it 'pushes data to Redis containing the composite_key' do expected_result = [ - { 'foo' => 1 }, 'MergeRequest', source_user.namespace_id, nil, source_user.id, 'author_id' + { 'foo' => 1 }, 'MergeRequest', 234, nil, 123, 'author_id' ].to_json expect(result).to be_success @@ -59,6 +58,9 @@ end describe '.from_record' do + let_it_be(:source_user) { create(:import_source_user) } + let_it_be(:record) { create(:merge_request) } + subject(:result) do described_class.from_record( import_source: import_source, diff --git a/spec/workers/import/load_placeholder_references_worker_spec.rb b/spec/workers/import/load_placeholder_references_worker_spec.rb new file mode 100644 index 00000000000000..92b53cd97778b4 --- /dev/null +++ b/spec/workers/import/load_placeholder_references_worker_spec.rb @@ -0,0 +1,52 @@ +# frozen_string_literal: true + +require 'spec_helper' + +RSpec.describe Import::LoadPlaceholderReferencesWorker, feature_category: :importers do + let(:user) { create(:user) } + let(:import_source) { 'test_source' } + let(:uid) { 123 } + let(:params) { { 'current_user_id' => user.id } } + + describe '#perform' do + subject(:perform) { described_class.new.perform(import_source, uid, params) } + + it 'executes LoadService' do + expect_next_instance_of(Import::PlaceholderReferences::LoadService) do |service| + expect(service).to receive(:execute) + end + + perform + end + + it_behaves_like 'an idempotent worker' do + let(:job_args) { [import_source, uid, params] } + end + + context 'when bulk_import_user_mapping feature is disabled' do + before do + stub_feature_flags(bulk_import_user_mapping: false) + end + + it 'does not execute LoadService' do + expect(Import::PlaceholderReferences::LoadService).not_to receive(:new) + + perform + end + + context 'when bulk_import_user_mapping feature is enabled for the user' do + before do + stub_feature_flags(bulk_import_user_mapping: user) + end + + it 'executes LoadService' do + expect_next_instance_of(Import::PlaceholderReferences::LoadService) do |service| + expect(service).to receive(:execute) + end + + perform + end + end + end + end +end -- GitLab From d85e01514ed4cfdffe827b8686f7eef96e327a2c Mon Sep 17 00:00:00 2001 From: Luke Duncalfe Date: Tue, 9 Jul 2024 15:21:58 +1200 Subject: [PATCH 4/8] Add reviewer feedback, and handle invalid FK --- .../placeholder_references/load_service.rb | 11 ++- .../load_service_spec.rb | 88 +++++++++++++------ 2 files changed, 70 insertions(+), 29 deletions(-) diff --git a/app/services/import/placeholder_references/load_service.rb b/app/services/import/placeholder_references/load_service.rb index 1375b3fca99efd..54cb416ccf227c 100644 --- a/app/services/import/placeholder_references/load_service.rb +++ b/app/services/import/placeholder_references/load_service.rb @@ -51,7 +51,7 @@ def load!(batch) begin bulk_insert!(to_load) - rescue ActiveRecord::RecordInvalid => e # What other kinds of errors? + rescue ActiveRecord::RecordInvalid => e # We optimise for all records being valid and only filter for validity # when there was a problem to_load.reject! do |item| @@ -63,6 +63,9 @@ def load!(batch) # Try again bulk_insert!(to_load) + rescue ActiveRecord::InvalidForeignKey => e + # This is an unrecoverable situation where we allow the error to clear the batch + log_error(to_load, e) end clear_batch!(batch) @@ -84,8 +87,10 @@ def log_error(item, exception) super( message: 'Error processing placeholder reference', item: item, - exception_class: exception.class, - exception_message: exception.message + exception: { + class: exception.class, + message: exception.message + } ) self.error_count += 1 diff --git a/spec/services/import/placeholder_references/load_service_spec.rb b/spec/services/import/placeholder_references/load_service_spec.rb index 03dcca87be955d..602b0cb7f116bf 100644 --- a/spec/services/import/placeholder_references/load_service_spec.rb +++ b/spec/services/import/placeholder_references/load_service_spec.rb @@ -69,7 +69,10 @@ .with(kind_of(Array), batch_size: be_between(1, 2)) .twice .and_call_original - + expect(Gitlab::Cache::Import::Caching) + .to receive(:limited_values_from_set) + .twice + .and_call_original expect_log_message(:info, message: 'Processing placeholder references') expect_log_message(:info, message: 'Processed placeholder references', processed_count: 3, error_count: 0) expect(result).to be_success @@ -107,25 +110,29 @@ expect(set_members_count).to eq(1) end - context 'when something in the batch does not validate' do - let(:invalid_reference) { valid_reference.except(:source_user_id) } + context 'when something in the batch has an unexpected schema' do + let(:invalid_reference) { valid_reference.merge(foo: 'bar') } before do stub_const("#{described_class}::BATCH_LIMIT", 2) push(valid_reference) - push(invalid_reference) push(valid_reference_2) + # We can't use `#push` because that validates, + # so push directly to the set. + Gitlab::Cache::Import::Caching.set_add(cache_key, invalid_reference.values.to_json) push(valid_reference_3) end - it 'loads just the valid data, and clears the list' do - expect_log_message(:info, message: 'Processing placeholder references') + it 'loads just the valid data, and clears the set' do + # expect_log_message(:info, message: 'Processing placeholder references') expect_log_message(:error, message: 'Error processing placeholder reference', - exception_class: ActiveRecord::RecordInvalid, - exception_message: "Validation failed: Source user can't be blank", - item: hash_including(invalid_reference.stringify_keys) + exception: { + class: Import::SourceUserPlaceholderReference::DecompressionError, + message: 'Import::SourceUserPlaceholderReference::DecompressionError' + }, + item: invalid_reference.values.to_json ) expect_log_message(:info, message: 'Processed placeholder references', processed_count: 4, error_count: 1) expect(result).to be_success @@ -139,27 +146,27 @@ end end - context 'when something in the batch has an unexpected schema' do - let(:invalid_reference) { valid_reference.merge(foo: 'bar') } + context 'when loading to PostgreSQL fails due to an ActiveRecord::RecordInvalid' do + let(:invalid_reference) { valid_reference.except(:source_user_id) } before do stub_const("#{described_class}::BATCH_LIMIT", 2) + push(invalid_reference) push(valid_reference) push(valid_reference_2) - # We can't use `#push` because that validates, - # so push directly to the set. - Gitlab::Cache::Import::Caching.set_add(cache_key, invalid_reference.values.to_json) push(valid_reference_3) end - it 'loads just the valid data, and clears the set' do - expect_log_message(:info, message: 'Processing placeholder references') + it 'loads just the valid data, and clears the list' do + # expect_log_message(:info, message: 'Processing placeholder references') expect_log_message(:error, message: 'Error processing placeholder reference', - exception_class: Import::SourceUserPlaceholderReference::DecompressionError, - exception_message: 'Import::SourceUserPlaceholderReference::DecompressionError', - item: invalid_reference.values.to_json + exception: { + class: ActiveRecord::RecordInvalid, + message: "Validation failed: Source user can't be blank" + }, + item: hash_including(invalid_reference.stringify_keys) ) expect_log_message(:info, message: 'Processed placeholder references', processed_count: 4, error_count: 1) expect(result).to be_success @@ -173,11 +180,40 @@ end end - context 'when loading to PostgreSQL fails' do + context 'when loading to PostgreSQL fails due to ActiveRecord::InvalidForeignKey' do + let(:invalid_reference) { valid_reference.merge(source_user_id: non_existing_record_id) } + + before do + stub_const("#{described_class}::BATCH_LIMIT", 2) + + push(invalid_reference) + push(valid_reference) + push(valid_reference_2) + push(valid_reference_3) + end + + it 'logs the error and clears the failing batch but continues' do + expect_log_message(:info, message: 'Processing placeholder references') + expect_log_message(:error, + message: 'Error processing placeholder reference', + exception: { + class: ActiveRecord::InvalidForeignKey, + message: include('PG::ForeignKeyViolation') + }, + item: have_attributes(size: 2).and(include(have_attributes(invalid_reference))) + ) + expect_log_message(:info, message: 'Processed placeholder references', processed_count: 4, error_count: 1) + expect(result).to be_success + expect(Import::SourceUserPlaceholderReference.count).to eq(2) + expect(set_members_count).to eq(0) + end + end + + context 'when loading to PostgreSQL fails for an unhandled reason' do before do allow(Import::SourceUserPlaceholderReference) - .to receive(:bulk_insert!) - .and_raise(ActiveRecord::ConnectionTimeoutError) + .to receive(:bulk_insert!) + .and_raise(ActiveRecord::ConnectionTimeoutError) end it 'bubbles the exception and does not clear the set' do @@ -208,15 +244,15 @@ context 'when clearing the set from Redis fails' do before do allow(Gitlab::Cache::Import::Caching) - .to receive(:set_remove) - .and_raise(Redis::ConnectionError) + .to receive(:set_remove) + .and_raise(Redis::ConnectionError) end it 'bubbles the exception and does not clear the set, but does load the data' do push(valid_reference) expect { result }.to raise_error(Redis::ConnectionError) - # (We tolerate that this leads to duplicate records!) + # (We tolerate that this leads to duplicate records loaded to PostgreSQL!) expect(Import::SourceUserPlaceholderReference.count).to eq(1) expect(set_members_count).to eq(1) end @@ -228,9 +264,9 @@ def push(reference) end def expect_log_message(type, **params) + allow(Import::Framework::Logger).to receive(type) expect(Import::Framework::Logger).to receive(type) .with(params.merge(import_source: import_source, import_uid: import_uid)) - .and_call_original end def set_members_count -- GitLab From 97b41ed7298223f63063ec58720122a11245d728 Mon Sep 17 00:00:00 2001 From: Luke Duncalfe Date: Wed, 10 Jul 2024 08:08:12 +1200 Subject: [PATCH 5/8] spec tidy ups --- .../import/placeholder_references/load_service_spec.rb | 4 ---- 1 file changed, 4 deletions(-) diff --git a/spec/services/import/placeholder_references/load_service_spec.rb b/spec/services/import/placeholder_references/load_service_spec.rb index 602b0cb7f116bf..3109b2ffc9355a 100644 --- a/spec/services/import/placeholder_references/load_service_spec.rb +++ b/spec/services/import/placeholder_references/load_service_spec.rb @@ -159,7 +159,6 @@ end it 'loads just the valid data, and clears the list' do - # expect_log_message(:info, message: 'Processing placeholder references') expect_log_message(:error, message: 'Error processing placeholder reference', exception: { @@ -168,7 +167,6 @@ }, item: hash_including(invalid_reference.stringify_keys) ) - expect_log_message(:info, message: 'Processed placeholder references', processed_count: 4, error_count: 1) expect(result).to be_success expect(result.payload).to eq(processed_count: 4, error_count: 1) expect(Import::SourceUserPlaceholderReference.all).to contain_exactly( @@ -193,7 +191,6 @@ end it 'logs the error and clears the failing batch but continues' do - expect_log_message(:info, message: 'Processing placeholder references') expect_log_message(:error, message: 'Error processing placeholder reference', exception: { @@ -202,7 +199,6 @@ }, item: have_attributes(size: 2).and(include(have_attributes(invalid_reference))) ) - expect_log_message(:info, message: 'Processed placeholder references', processed_count: 4, error_count: 1) expect(result).to be_success expect(Import::SourceUserPlaceholderReference.count).to eq(2) expect(set_members_count).to eq(0) -- GitLab From 87de92f74d068d19bb6659a242074321787f98ba Mon Sep 17 00:00:00 2001 From: Luke Duncalfe Date: Thu, 11 Jul 2024 11:08:49 +1200 Subject: [PATCH 6/8] Add reviewer feedback --- app/services/import/placeholder_references/load_service.rb | 3 +++ 1 file changed, 3 insertions(+) diff --git a/app/services/import/placeholder_references/load_service.rb b/app/services/import/placeholder_references/load_service.rb index 54cb416ccf227c..e455e889b9cf38 100644 --- a/app/services/import/placeholder_references/load_service.rb +++ b/app/services/import/placeholder_references/load_service.rb @@ -21,6 +21,9 @@ def execute load!(batch) + # End this loop if we know that we cleared the set earlier. + # This will prevent looping further and processing just a few records at a time if an import is simultaneously + # writing data to Redis. See https://gitlab.com/gitlab-org/gitlab/-/merge_requests/156704#note_1988728000 break if batch.size < BATCH_LIMIT end -- GitLab From 517ef7a283f7375d244710478c027e3e8026e2fe Mon Sep 17 00:00:00 2001 From: Luke Duncalfe Date: Fri, 12 Jul 2024 10:41:12 +1200 Subject: [PATCH 7/8] Add reviewer feedback --- .../source_user_placeholder_reference.rb | 18 +++++------ .../placeholder_references/load_service.rb | 12 +++----- .../placeholder_references/push_service.rb | 6 ++-- .../source_user_placeholder_reference_spec.rb | 30 +++++++++---------- .../load_service_spec.rb | 9 +++--- .../push_service_spec.rb | 6 ++-- 6 files changed, 38 insertions(+), 43 deletions(-) diff --git a/app/models/import/source_user_placeholder_reference.rb b/app/models/import/source_user_placeholder_reference.rb index 5a866c87d4929c..9441336e48d84a 100644 --- a/app/models/import/source_user_placeholder_reference.rb +++ b/app/models/import/source_user_placeholder_reference.rb @@ -18,10 +18,10 @@ class SourceUserPlaceholderReference < ApplicationRecord attribute :composite_key, :ind_jsonb - # If an element is ever added to this array, ensure that `.from_compressed` handles receiving + # If an element is ever added to this array, ensure that `.from_serialized` handles receiving # older versions of the array by filling in missing values with defaults. We'd keep that in place # for at least one release cycle to ensure backward compatibility. - COMPRESSABLE_ATTRIBUTES = %w[ + SERIALIZABLE_ATTRIBUTES = %w[ composite_key model namespace_id @@ -30,19 +30,19 @@ class SourceUserPlaceholderReference < ApplicationRecord user_reference_column ].freeze - DecompressionError = Class.new(StandardError) + SerializationError = Class.new(StandardError) - def to_compressed - Gitlab::Json.dump(attributes.slice(*COMPRESSABLE_ATTRIBUTES).to_h.values) + def to_serialized + Gitlab::Json.dump(attributes.slice(*SERIALIZABLE_ATTRIBUTES).to_h.values) end class << self - def from_compressed(compressed_reference) - uncompressed = Gitlab::Json.parse(compressed_reference) + def from_serialized(serialized_reference) + deserialized = Gitlab::Json.parse(serialized_reference) - raise DecompressionError if uncompressed.size != COMPRESSABLE_ATTRIBUTES.size + raise SerializationError if deserialized.size != SERIALIZABLE_ATTRIBUTES.size - attributes = COMPRESSABLE_ATTRIBUTES.zip(uncompressed).to_h + attributes = SERIALIZABLE_ATTRIBUTES.zip(deserialized).to_h new(attributes.merge(created_at: Time.zone.now)) end diff --git a/app/services/import/placeholder_references/load_service.rb b/app/services/import/placeholder_references/load_service.rb index e455e889b9cf38..d448d02a8f9b4d 100644 --- a/app/services/import/placeholder_references/load_service.rb +++ b/app/services/import/placeholder_references/load_service.rb @@ -15,15 +15,11 @@ def initialize(import_source:, import_uid:) def execute log_info(message: 'Processing placeholder references') - loop do - batch = next_batch - break if batch.empty? - + while (batch = next_batch).present? load!(batch) # End this loop if we know that we cleared the set earlier. - # This will prevent looping further and processing just a few records at a time if an import is simultaneously - # writing data to Redis. See https://gitlab.com/gitlab-org/gitlab/-/merge_requests/156704#note_1988728000 + # This prevents processing just a few records at a time if an import is simultaneously writing data to Redis. break if batch.size < BATCH_LIMIT end @@ -46,8 +42,8 @@ def next_batch def load!(batch) to_load = batch.filter_map do |item| - SourceUserPlaceholderReference.from_compressed(item) - rescue JSON::ParserError, SourceUserPlaceholderReference::DecompressionError => e + SourceUserPlaceholderReference.from_serialized(item) + rescue JSON::ParserError, SourceUserPlaceholderReference::SerializationError => e log_error(item, e) nil end diff --git a/app/services/import/placeholder_references/push_service.rb b/app/services/import/placeholder_references/push_service.rb index 84705f27183744..5f1c119c9a373a 100644 --- a/app/services/import/placeholder_references/push_service.rb +++ b/app/services/import/placeholder_references/push_service.rb @@ -36,11 +36,11 @@ def initialize(import_source:, import_uid:, source_user_id:, source_user_namespa def execute return error(reference.errors.full_messages, :bad_request) unless reference.valid? - compressed_reference = reference.to_compressed + serialized_reference = reference.to_serialized - cache.set_add(cache_key, compressed_reference, timeout: cache_ttl) + cache.set_add(cache_key, serialized_reference, timeout: cache_ttl) - success(compressed_reference: compressed_reference) + success(serialized_reference: serialized_reference) end private diff --git a/spec/models/import/source_user_placeholder_reference_spec.rb b/spec/models/import/source_user_placeholder_reference_spec.rb index b9abb566141eba..f003014564a04f 100644 --- a/spec/models/import/source_user_placeholder_reference_spec.rb +++ b/spec/models/import/source_user_placeholder_reference_spec.rb @@ -43,8 +43,8 @@ def validation_errors(...) expect { reference.source_user.destroy! }.to change { described_class.count }.by(-1) end - describe 'COMPRESSABLE_ATTRIBUTES' do - subject(:constant) { described_class::COMPRESSABLE_ATTRIBUTES } + describe 'SERIALIZABLE_ATTRIBUTES' do + subject(:constant) { described_class::SERIALIZABLE_ATTRIBUTES } it 'is the expected list of attribute names' do expected_elements = %w[ @@ -57,17 +57,17 @@ def validation_errors(...) ] failure_message = <<-MSG - Before fixing this spec, ensure that `#{described_class.name}.from_compressed` + Before fixing this spec, ensure that `#{described_class.name}.from_serialized` handles receiving older versions of the array by filling in missing values with defaults. - You are seeing this message because COMPRESSABLE_ATTRIBUTES has changed. + You are seeing this message because SERIALIZABLE_ATTRIBUTES has changed. MSG expect(constant).to eq(expected_elements), failure_message end end - describe '#to_compressed' do + describe '#to_serialized' do let(:reference) do build(:import_source_user_placeholder_reference, numeric_key: 1, @@ -79,19 +79,19 @@ def validation_errors(...) ) end - subject(:compressed) { reference.to_compressed } + subject(:serialized) { reference.to_serialized } it { is_expected.to eq('[{"key":1},"Model",2,1,3,"foo"]') } end - describe '.from_compressed' do - subject(:from_compressed) { described_class.from_compressed(compressed) } + describe '.from_serialized' do + subject(:from_serialized) { described_class.from_serialized(serialized) } - context 'when compressed reference is valid' do - let(:compressed) { '[{"key":1},"Model",2,null,3,"foo"]' } + context 'when serialized reference is valid' do + let(:serialized) { '[{"key":1},"Model",2,null,3,"foo"]' } it 'returns a valid SourceUserPlaceholderReference' do - expect(from_compressed).to be_a(described_class) + expect(from_serialized).to be_a(described_class) .and(be_valid) .and(have_attributes( composite_key: { key: 1 }, @@ -104,15 +104,15 @@ def validation_errors(...) end it 'sets the created_at' do - expect(from_compressed.created_at).to be_like_time(Time.zone.now) + expect(from_serialized.created_at).to be_like_time(Time.zone.now) end end - context 'when compressed reference has different number of elements than expected' do - let(:compressed) { '[{"key":1},"Model",2,null,3]' } + context 'when serialized reference has different number of elements than expected' do + let(:serialized) { '[{"key":1},"Model",2,null,3]' } it 'raises an exception' do - expect { from_compressed }.to raise_error(described_class::DecompressionError) + expect { from_serialized }.to raise_error(described_class::SerializationError) end end end diff --git a/spec/services/import/placeholder_references/load_service_spec.rb b/spec/services/import/placeholder_references/load_service_spec.rb index 3109b2ffc9355a..567b7f07c44d97 100644 --- a/spec/services/import/placeholder_references/load_service_spec.rb +++ b/spec/services/import/placeholder_references/load_service_spec.rb @@ -125,12 +125,11 @@ end it 'loads just the valid data, and clears the set' do - # expect_log_message(:info, message: 'Processing placeholder references') expect_log_message(:error, message: 'Error processing placeholder reference', exception: { - class: Import::SourceUserPlaceholderReference::DecompressionError, - message: 'Import::SourceUserPlaceholderReference::DecompressionError' + class: Import::SourceUserPlaceholderReference::SerializationError, + message: 'Import::SourceUserPlaceholderReference::SerializationError' }, item: invalid_reference.values.to_json ) @@ -255,8 +254,8 @@ end def push(reference) - compressed = Import::SourceUserPlaceholderReference.new(reference).to_compressed - Gitlab::Cache::Import::Caching.set_add(cache_key, compressed) + serialized = Import::SourceUserPlaceholderReference.new(reference).to_serialized + Gitlab::Cache::Import::Caching.set_add(cache_key, serialized) end def expect_log_message(type, **params) diff --git a/spec/services/import/placeholder_references/push_service_spec.rb b/spec/services/import/placeholder_references/push_service_spec.rb index 72b8946eec649e..b6de3af4337793 100644 --- a/spec/services/import/placeholder_references/push_service_spec.rb +++ b/spec/services/import/placeholder_references/push_service_spec.rb @@ -27,7 +27,7 @@ expected_result = [nil, 'MergeRequest', 234, 9, 123, 'author_id'].to_json expect(result).to be_success - expect(result.payload).to eq(compressed_reference: expected_result) + expect(result.payload).to eq(serialized_reference: expected_result) expect(set).to contain_exactly(expected_result) end @@ -41,7 +41,7 @@ ].to_json expect(result).to be_success - expect(result.payload).to eq(compressed_reference: expected_result) + expect(result.payload).to eq(serialized_reference: expected_result) expect(set).to contain_exactly(expected_result) end end @@ -75,7 +75,7 @@ expected_result = [nil, 'MergeRequest', source_user.namespace_id, record.id, source_user.id, 'author_id'].to_json expect(result).to be_success - expect(result.payload).to eq(compressed_reference: expected_result) + expect(result.payload).to eq(serialized_reference: expected_result) expect(set).to contain_exactly(expected_result) end end -- GitLab From ca539bc86107925017ced88da827b77c2ae99900 Mon Sep 17 00:00:00 2001 From: Luke Duncalfe Date: Mon, 15 Jul 2024 16:55:13 +1200 Subject: [PATCH 8/8] Add reviewer feedback --- app/services/import/placeholder_references/load_service.rb | 2 +- .../import/placeholder_references/load_service_spec.rb | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/app/services/import/placeholder_references/load_service.rb b/app/services/import/placeholder_references/load_service.rb index d448d02a8f9b4d..c18a155a62216e 100644 --- a/app/services/import/placeholder_references/load_service.rb +++ b/app/services/import/placeholder_references/load_service.rb @@ -71,7 +71,7 @@ def load!(batch) end def bulk_insert!(to_load) - Import::SourceUserPlaceholderReference.bulk_insert!(to_load, batch_size: to_load.size) + Import::SourceUserPlaceholderReference.bulk_insert!(to_load) end def clear_batch!(batch) diff --git a/spec/services/import/placeholder_references/load_service_spec.rb b/spec/services/import/placeholder_references/load_service_spec.rb index 567b7f07c44d97..b12646aa4aebf7 100644 --- a/spec/services/import/placeholder_references/load_service_spec.rb +++ b/spec/services/import/placeholder_references/load_service_spec.rb @@ -37,7 +37,7 @@ expect(Import::SourceUserPlaceholderReference) .to receive(:bulk_insert!) - .with(kind_of(Array), batch_size: 1) + .with(kind_of(Array)) .and_call_original expect_log_message(:info, message: 'Processing placeholder references') @@ -66,7 +66,7 @@ expect(Import::SourceUserPlaceholderReference) .to receive(:bulk_insert!) - .with(kind_of(Array), batch_size: be_between(1, 2)) + .with(kind_of(Array)) .twice .and_call_original expect(Gitlab::Cache::Import::Caching) -- GitLab