diff --git a/app/models/concerns/counter_attribute.rb b/app/models/concerns/counter_attribute.rb index 64d178b750749c6341b97237d4674c7dabe17be4..058cb7752f65885455a43af5581cd4a6a703ea5e 100644 --- a/app/models/concerns/counter_attribute.rb +++ b/app/models/concerns/counter_attribute.rb @@ -95,7 +95,7 @@ def flush_increments_to_database!(attribute) next if increment_value == 0 transaction do - unsafe_update_counters(id, attribute => increment_value) + update_counters_with_lease({ attribute => increment_value }) redis_state { |redis| redis.del(flushed_key) } new_db_value = reset.read_attribute(attribute) end @@ -130,9 +130,18 @@ def increment_counter(attribute, increment) end end - def clear_counter!(attribute) + def update_counters_with_lease(increments) + detect_race_on_record(log_fields: increments.merge({ caller: __method__ })) do + self.class.update_counters(id, increments) + end + end + + def reset_counter!(attribute) if counter_attribute_enabled?(attribute) - redis_state { |redis| redis.del(counter_key(attribute)) } + detect_race_on_record(log_fields: { caller: __method__ }) do + update!(attribute => 0) + clear_counter!(attribute) + end log_clear_counter(attribute) end @@ -164,14 +173,20 @@ def counter_attribute_enabled?(attribute) private + def database_lock_key + "project:{#{project_id}}:#{self.class}:#{id}" + end + def steal_increments(increment_key, flushed_key) redis_state do |redis| redis.eval(LUA_STEAL_INCREMENT_SCRIPT, keys: [increment_key, flushed_key]) end end - def unsafe_update_counters(id, increments) - self.class.update_counters(id, increments) + def clear_counter!(attribute) + redis_state do |redis| + redis.del(counter_key(attribute)) + end end def execute_after_flush_callbacks @@ -192,6 +207,32 @@ def with_exclusive_lease(lock_key) # a worker is already updating the counters end + # detect_race_on_record uses a lease to monitor access + # to the project statistics row. This is needed to detect + # concurrent attempts to increment columns, which could result in a + # race condition. + # + # As the purpose is to detect and warn concurrent attempts, + # it falls back to direct update on the row if it fails to obtain the lease. + # + # It does not guarantee that there will not be any concurrent updates. + def detect_race_on_record(log_fields: {}) + return yield unless Feature.enabled?(:counter_attribute_db_lease_for_update, project) + + in_lock(database_lock_key, retries: 2) do + yield + end + rescue Gitlab::ExclusiveLeaseHelpers::FailedToObtainLockError + Gitlab::AppLogger.warn( + message: 'Concurrent update to project statistics detected', + project_statistics_id: id, + **log_fields, + **Gitlab::ApplicationContext.current + ) + + yield + end + def log_increment_counter(attribute, increment, new_value) payload = Gitlab::ApplicationContext.current.merge( message: 'Increment counter attribute', diff --git a/app/models/project_statistics.rb b/app/models/project_statistics.rb index a91e029143886041412ea4b70eee80201aaa2f87..40838fe8726c98bc68f239d5b466695bb1ec4c0a 100644 --- a/app/models/project_statistics.rb +++ b/app/models/project_statistics.rb @@ -49,7 +49,9 @@ def refresh!(only: []) schedule_namespace_aggregation_worker end - save! + detect_race_on_record(log_fields: { caller: __method__ }) do + save! + end end def update_commit_count @@ -110,8 +112,10 @@ def update_storage_size end def refresh_storage_size! - update_storage_size - save! + detect_race_on_record(log_fields: { caller: __method__ }) do + update_storage_size + save! + end end # Since this incremental update method does not call update_storage_size above through before_save, @@ -129,36 +133,34 @@ def self.increment_statistic(project, key, amount) if counter_attribute_enabled?(key) project_statistics.delayed_increment_counter(key, amount) else - legacy_increment_statistic(project, key, amount) + project_statistics.legacy_increment_statistic(key, amount) end end end - def self.legacy_increment_statistic(project, key, amount) - where(project_id: project.id).columns_to_increment(key, amount) + def self.incrementable_attribute?(key) + INCREMENTABLE_COLUMNS.key?(key) || counter_attribute_enabled?(key) + end + + def legacy_increment_statistic(key, amount) + increment_columns!(key, amount) Namespaces::ScheduleAggregationWorker.perform_async( # rubocop: disable CodeReuse/Worker project.namespace_id) end - def self.columns_to_increment(key, amount) - updates = ["#{key} = COALESCE(#{key}, 0) + (#{amount})"] + private - if (additional = INCREMENTABLE_COLUMNS[key]) - additional.each do |column| - updates << "#{column} = COALESCE(#{column}, 0) + (#{amount})" - end + def increment_columns!(key, amount) + increments = { key => amount } + additional = INCREMENTABLE_COLUMNS.fetch(key, []) + additional.each do |column| + increments[column] = amount end - update_all(updates.join(', ')) + update_counters_with_lease(increments) end - def self.incrementable_attribute?(key) - INCREMENTABLE_COLUMNS.key?(key) || counter_attribute_enabled?(key) - end - - private - def schedule_namespace_aggregation_worker run_after_commit do Namespaces::ScheduleAggregationWorker.perform_async(project.namespace_id) diff --git a/app/models/projects/build_artifacts_size_refresh.rb b/app/models/projects/build_artifacts_size_refresh.rb index e66e1d5b42f7a26140beecbe17feeb8059a4d823..2ffc7478178f7ae2411e14881fa0d8ddd43e2088 100644 --- a/app/models/projects/build_artifacts_size_refresh.rb +++ b/app/models/projects/build_artifacts_size_refresh.rb @@ -80,9 +80,7 @@ def self.process_next_refresh! end def reset_project_statistics! - statistics = project.statistics - statistics.update!(build_artifacts_size: 0) - statistics.clear_counter!(:build_artifacts_size) + project.statistics.reset_counter!(:build_artifacts_size) end def next_batch(limit:) diff --git a/config/feature_flags/development/counter_attribute_db_lease_for_update.yml b/config/feature_flags/development/counter_attribute_db_lease_for_update.yml new file mode 100644 index 0000000000000000000000000000000000000000..7c30bb3e913a4764472700589da7323f4bf54a35 --- /dev/null +++ b/config/feature_flags/development/counter_attribute_db_lease_for_update.yml @@ -0,0 +1,8 @@ +--- +name: counter_attribute_db_lease_for_update +introduced_by_url: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/97912 +rollout_issue_url: https://gitlab.com/gitlab-org/gitlab/-/issues/374596 +milestone: '15.5' +type: development +group: group::pipeline insights +default_enabled: false diff --git a/spec/models/project_statistics_spec.rb b/spec/models/project_statistics_spec.rb index b2158baa67088d01daec51bfed3b0cabeaa05de0..f3f1e43f6c5ceb8712ba3136adefa457446821fd 100644 --- a/spec/models/project_statistics_spec.rb +++ b/spec/models/project_statistics_spec.rb @@ -98,6 +98,8 @@ end describe '#refresh!' do + subject { statistics.refresh! } + before do allow(statistics).to receive(:update_commit_count) allow(statistics).to receive(:update_repository_size) @@ -111,7 +113,7 @@ context "without arguments" do before do - statistics.refresh! + subject end it "sums all counters" do @@ -146,7 +148,7 @@ expect(project.repository.exists?).to be_falsey expect(project.wiki.repository.exists?).to be_falsey - statistics.refresh! + subject expect(statistics).to have_received(:update_commit_count) expect(statistics).to have_received(:update_repository_size) @@ -174,7 +176,7 @@ end it 'does not crash' do - statistics.refresh! + subject expect(statistics).to have_received(:update_commit_count) expect(statistics).to have_received(:update_repository_size) @@ -209,7 +211,7 @@ expect(Namespaces::ScheduleAggregationWorker) .to receive(:perform_async) - statistics.refresh! + subject end end end @@ -238,9 +240,13 @@ expect(Namespaces::ScheduleAggregationWorker) .not_to receive(:perform_async) - statistics.refresh! + subject end end + + it_behaves_like 'obtaining lease to update database' do + let(:model) { statistics } + end end describe '#update_commit_count' do @@ -408,6 +414,8 @@ end describe '#refresh_storage_size!' do + subject { statistics.refresh_storage_size! } + it 'recalculates storage size from its components and save it' do statistics.update_columns( repository_size: 2, @@ -422,7 +430,11 @@ storage_size: 0 ) - expect { statistics.refresh_storage_size! }.to change { statistics.storage_size }.from(0).to(28) + expect { subject }.to change { statistics.storage_size }.from(0).to(28) + end + + it_behaves_like 'obtaining lease to update database' do + let(:model) { statistics } end end diff --git a/spec/support/helpers/exclusive_lease_helpers.rb b/spec/support/helpers/exclusive_lease_helpers.rb index 95cfc56c273e9541973796ba8269b86f3930dd49..06e5ae5427cb193e36a705b6fc3dc33de6d21a0e 100644 --- a/spec/support/helpers/exclusive_lease_helpers.rb +++ b/spec/support/helpers/exclusive_lease_helpers.rb @@ -2,6 +2,8 @@ module ExclusiveLeaseHelpers def stub_exclusive_lease(key = nil, uuid = 'uuid', renew: false, timeout: nil) + prepare_exclusive_lease_stub + key ||= instance_of(String) timeout ||= instance_of(Integer) @@ -37,4 +39,21 @@ def expect_to_cancel_exclusive_lease(key, uuid) .to receive(:cancel) .with(key, uuid) end + + private + + # This prepares the stub to be able to stub specific lease keys + # while allowing unstubbed lease keys to behave as original. + # + # allow(Gitlab::ExclusiveLease).to receive(:new).and_call_original + # can only be called once to prevent resetting stubs when + # `stub_exclusive_lease` is called multiple times. + def prepare_exclusive_lease_stub + return if @exclusive_lease_allowed_to_call_original + + allow(Gitlab::ExclusiveLease) + .to receive(:new).and_call_original + + @exclusive_lease_allowed_to_call_original = true + end end diff --git a/spec/support/shared_examples/models/concerns/counter_attribute_shared_examples.rb b/spec/support/shared_examples/models/concerns/counter_attribute_shared_examples.rb index 91e517270ffece4ec727e0a52469162e9343fc42..ea4ba536403e7814b050d1610cf11b2f2387a5ba 100644 --- a/spec/support/shared_examples/models/concerns/counter_attribute_shared_examples.rb +++ b/spec/support/shared_examples/models/concerns/counter_attribute_shared_examples.rb @@ -92,8 +92,8 @@ it 'obtains an exclusive lease during processing' do expect(model) .to receive(:in_lock) - .with(model.counter_lock_key(incremented_attribute), ttl: described_class::WORKER_LOCK_TTL) - .and_call_original + .with(model.counter_lock_key(incremented_attribute), ttl: described_class::WORKER_LOCK_TTL) + .and_call_original subject end @@ -186,31 +186,88 @@ end end - describe '#clear_counter!' do + describe '#reset_counter!' do let(:attribute) { counter_attributes.first } before do + model.update!(attribute => 123) model.increment_counter(attribute, 10) end - it 'deletes the counter key for the given attribute and logs it' do - expect(Gitlab::AppLogger).to receive(:info).with( - hash_including( - message: 'Clear counter attribute', - attribute: attribute, - project_id: model.project_id, - 'correlation_id' => an_instance_of(String), - 'meta.feature_category' => 'test', - 'meta.caller_id' => 'caller' - ) - ) + subject { model.reset_counter!(attribute) } - model.clear_counter!(attribute) + it 'resets the attribute value to 0 and clears existing counter', :aggregate_failures do + expect { subject }.to change { model.reload.send(attribute) }.from(123).to(0) Gitlab::Redis::SharedState.with do |redis| key_exists = redis.exists?(model.counter_key(attribute)) expect(key_exists).to be_falsey end end + + it_behaves_like 'obtaining lease to update database' do + context 'when the execution raises error' do + before do + allow(model).to receive(:update!).and_raise(StandardError, 'Something went wrong') + end + + it 'reraises error' do + expect { subject }.to raise_error(StandardError, 'Something went wrong') + end + end + end + end + + describe '#update_counters_with_lease' do + let(:increments) { { build_artifacts_size: 1, packages_size: 2 } } + + subject { model.update_counters_with_lease(increments) } + + it 'updates counters of the record' do + expect { subject } + .to change { model.reload.build_artifacts_size }.by(1) + .and change { model.reload.packages_size }.by(2) + end + + it_behaves_like 'obtaining lease to update database' do + context 'when the execution raises error' do + before do + allow(model.class).to receive(:update_counters).and_raise(StandardError, 'Something went wrong') + end + + it 'reraises error' do + expect { subject }.to raise_error(StandardError, 'Something went wrong') + end + end + end + end +end + +RSpec.shared_examples 'obtaining lease to update database' do + context 'when it is unable to obtain lock' do + before do + allow(model).to receive(:in_lock).and_raise(Gitlab::ExclusiveLeaseHelpers::FailedToObtainLockError) + end + + it 'logs a warning' do + allow(model).to receive(:in_lock).and_raise(Gitlab::ExclusiveLeaseHelpers::FailedToObtainLockError) + + expect(Gitlab::AppLogger).to receive(:warn).once + + expect { subject }.not_to raise_error + end + end + + context 'when feature flag counter_attribute_db_lease_for_update is disabled' do + before do + stub_feature_flags(counter_attribute_db_lease_for_update: false) + allow(model).to receive(:in_lock).and_call_original + end + + it 'does not attempt to get a lock' do + expect(model).not_to receive(:in_lock) + + subject + end end end