From 0fccb58c3c8be6c4a073a3cace8d7fdae7e4201c Mon Sep 17 00:00:00 2001 From: Nicolae Rotaru Date: Thu, 4 Dec 2025 12:19:04 +0100 Subject: [PATCH 01/10] Implement Async Worker for Security Scan Profile Bulk Operations Changelog: added EE: true --- config/sidekiq_queues.yml | 2 + ee/app/workers/all_queues.yml | 10 ++ .../security/scan_profiles/attach_worker.rb | 69 ++++++++ .../scan_profiles/attach_worker_spec.rb | 160 ++++++++++++++++++ 4 files changed, 241 insertions(+) create mode 100644 ee/app/workers/security/scan_profiles/attach_worker.rb create mode 100644 ee/spec/workers/security/scan_profiles/attach_worker_spec.rb diff --git a/config/sidekiq_queues.yml b/config/sidekiq_queues.yml index 51bafafa050438..17f4f4589b75d2 100644 --- a/config/sidekiq_queues.yml +++ b/config/sidekiq_queues.yml @@ -1057,6 +1057,8 @@ - 1 - - security_scan_execution_policies_rule_schedule - 1 +- - security_scan_profiles_attach + - 1 - - security_scan_profiles_process_project_transfer_events - 1 - - security_scan_result_policies_add_approvers_to_rules diff --git a/ee/app/workers/all_queues.yml b/ee/app/workers/all_queues.yml index f202a270c6976a..f388038df881d4 100644 --- a/ee/app/workers/all_queues.yml +++ b/ee/app/workers/all_queues.yml @@ -4126,6 +4126,16 @@ :idempotent: false :tags: [] :queue_namespace: +- :name: security_scan_profiles_attach + :worker_name: Security::ScanProfiles::AttachWorker + :feature_category: :security_policy_management + :has_external_dependencies: false + :urgency: :low + :resource_boundary: :unknown + :weight: 1 + :idempotent: true + :tags: [] + :queue_namespace: - :name: security_scan_profiles_process_project_transfer_events :worker_name: Security::ScanProfiles::ProcessProjectTransferEventsWorker :feature_category: :security_asset_inventories diff --git a/ee/app/workers/security/scan_profiles/attach_worker.rb b/ee/app/workers/security/scan_profiles/attach_worker.rb new file mode 100644 index 00000000000000..268060055b1a44 --- /dev/null +++ b/ee/app/workers/security/scan_profiles/attach_worker.rb @@ -0,0 +1,69 @@ +# frozen_string_literal: true + +module Security + module ScanProfiles + class AttachWorker + include ApplicationWorker + + data_consistency :sticky + idempotent! + deduplicate :until_executed + + concurrency_limit -> { 200 } + + feature_category :security_policy_management + + BATCH_SIZE = 500 + + def perform(group_id, scan_profile_id) + group = Group.find_by_id(group_id) + return unless group + + scan_profile = Security::ScanProfile.find_by_id(scan_profile_id) + return unless scan_profile + return unless scan_profile.namespace_id == group.root_ancestor.id + + attach_profile_to_projects(group, scan_profile) + end + + private + + def attach_profile_to_projects(group, scan_profile) + project_ids = Gitlab::Database::NamespaceProjectIdsEachBatch.new( + group_id: group.id, + batch_size: BATCH_SIZE + ).execute + + return if project_ids.empty? + + bulk_insert_profile_projects(project_ids, scan_profile) + rescue StandardError => e + Gitlab::ErrorTracking.track_exception(e, { + group_id: group.id, + scan_profile_id: scan_profile.id, + project_ids_count: project_ids&.size + }) + end + + def bulk_insert_profile_projects(project_ids, scan_profile) + timestamp = Time.current + + project_ids.each_slice(BATCH_SIZE) do |batch_ids| + attributes = batch_ids.map do |project_id| + { + project_id: project_id, + security_scan_profile_id: scan_profile.id, + created_at: timestamp, + updated_at: timestamp + } + end + + Security::ScanProfileProject.insert_all( + attributes, + unique_by: [:project_id, :security_scan_profile_id] + ) + end + end + end + end +end diff --git a/ee/spec/workers/security/scan_profiles/attach_worker_spec.rb b/ee/spec/workers/security/scan_profiles/attach_worker_spec.rb new file mode 100644 index 00000000000000..36c6398ac43021 --- /dev/null +++ b/ee/spec/workers/security/scan_profiles/attach_worker_spec.rb @@ -0,0 +1,160 @@ +# frozen_string_literal: true + +require 'spec_helper' + +RSpec.describe Security::ScanProfiles::AttachWorker, feature_category: :security_policy_management do + let_it_be(:root_group) { create(:group) } + let_it_be(:subgroup) { create(:group, parent: root_group) } + let_it_be(:nested_subgroup) { create(:group, parent: subgroup) } + + let_it_be(:project1) { create(:project, namespace: root_group) } + let_it_be(:project2) { create(:project, namespace: subgroup) } + let_it_be(:project3) { create(:project, namespace: nested_subgroup) } + + let_it_be(:scan_profile) do + create(:security_scan_profile, namespace: root_group, scan_type: :secret_detection) + end + + let(:group) { root_group } + let(:group_id) { group.id } + let(:scan_profile_id) { scan_profile.id } + + subject(:worker) { described_class.new } + + describe '#perform' do + it 'attaches the scan profile to all projects in the group hierarchy' do + expect { worker.perform(group_id, scan_profile_id) } + .to change { Security::ScanProfileProject.count }.by(3) + + expect(Security::ScanProfileProject.exists?(project: project1, security_scan_profile_id: scan_profile.id)) + .to be(true) + expect(Security::ScanProfileProject.exists?(project: project2, security_scan_profile_id: scan_profile.id)) + .to be(true) + expect(Security::ScanProfileProject.exists?(project: project3, security_scan_profile_id: scan_profile.id)) + .to be(true) + end + + context 'when called for a subgroup' do + let(:group) { subgroup } + + it 'attaches the scan profile to projects in the subgroup and its descendants' do + expect { worker.perform(group_id, scan_profile_id) } + .to change { Security::ScanProfileProject.count }.by(2) + + expect(Security::ScanProfileProject.exists?(project: project1, security_scan_profile_id: scan_profile.id)) + .to be(false) + expect(Security::ScanProfileProject.exists?(project: project2, security_scan_profile_id: scan_profile.id)) + .to be(true) + expect(Security::ScanProfileProject.exists?(project: project3, security_scan_profile_id: scan_profile.id)) + .to be(true) + end + end + + context 'when the profile is already attached to some projects' do + before do + create(:security_scan_profile_project, project: project1, scan_profile: scan_profile) + end + + it 'is idempotent and does not create duplicate records' do + expect { worker.perform(group_id, scan_profile_id) } + .to change { Security::ScanProfileProject.count }.by(2) + + # Running again should not create duplicates + expect { worker.perform(group_id, scan_profile_id) } + .not_to change { Security::ScanProfileProject.count } + + expect(Security::ScanProfileProject.where(project: project1, security_scan_profile_id: scan_profile.id).count) + .to eq(1) + end + end + + context 'when group does not exist' do + let(:group_id) { non_existing_record_id } + + it 'exits gracefully without creating records' do + expect { worker.perform(group_id, scan_profile_id) } + .not_to change { Security::ScanProfileProject.count } + end + + it 'does not raise an error' do + expect { worker.perform(group_id, scan_profile_id) }.not_to raise_error + end + end + + context 'when scan profile does not exist' do + let(:scan_profile_id) { non_existing_record_id } + + it 'exits gracefully without creating records' do + expect { worker.perform(group_id, scan_profile_id) } + .not_to change { Security::ScanProfileProject.count } + end + + it 'does not raise an error' do + expect { worker.perform(group_id, scan_profile_id) }.not_to raise_error + end + end + + context 'when scan profile belongs to a different root namespace' do + let_it_be(:other_root_group) { create(:group) } + let_it_be(:other_scan_profile) do + create(:security_scan_profile, namespace: other_root_group, scan_type: :secret_detection) + end + + let(:scan_profile_id) { other_scan_profile.id } + + it 'exits gracefully without creating records' do + expect { worker.perform(group_id, scan_profile_id) } + .not_to change { Security::ScanProfileProject.count } + end + + it 'does not raise an error' do + expect { worker.perform(group_id, scan_profile_id) }.not_to raise_error + end + end + + context 'when the group has no projects' do + let_it_be(:empty_group) { create(:group, parent: root_group) } + let(:group) { empty_group } + + it 'completes without errors' do + expect { worker.perform(group_id, scan_profile_id) } + .not_to change { Security::ScanProfileProject.count } + end + + it 'does not raise an error' do + expect { worker.perform(group_id, scan_profile_id) }.not_to raise_error + end + end + + context 'when processing large number of projects' do + let_it_be(:large_group) { create(:group) } + let_it_be(:large_scan_profile) do + create(:security_scan_profile, namespace: large_group, scan_type: :secret_detection) + end + + let(:group) { large_group } + let(:scan_profile_id) { large_scan_profile.id } + + it 'uses insert_all in batches' do + # Mock the project IDs to simulate 1200 projects without actually creating them + project_ids = (1..1200).to_a + batch_executor = instance_double(Gitlab::Database::NamespaceProjectIdsEachBatch, execute: project_ids) + allow(Gitlab::Database::NamespaceProjectIdsEachBatch).to receive(:new) + .and_return(batch_executor) + + # Should call insert_all 3 times (1200 / 500 = 2.4, rounded up to 3) + expect(Security::ScanProfileProject).to receive(:insert_all).exactly(3).times.and_call_original + + worker.perform(group_id, scan_profile_id) + end + + it 'uses NamespaceProjectIdsEachBatch to collect project IDs' do + expect(Gitlab::Database::NamespaceProjectIdsEachBatch).to receive(:new) + .with(group_id: group.id, batch_size: 500) + .and_call_original + + worker.perform(group_id, scan_profile_id) + end + end + end +end -- GitLab From f111bf1d5e17baac525677cd3c60f0ab130e3495 Mon Sep 17 00:00:00 2001 From: Nicolae Rotaru Date: Fri, 5 Dec 2025 10:32:08 +0100 Subject: [PATCH 02/10] Address feedback: refactor the specs --- ee/app/workers/all_queues.yml | 2 +- .../security/scan_profiles/attach_worker.rb | 2 +- .../scan_profiles/attach_worker_spec.rb | 57 ++++++------------- 3 files changed, 18 insertions(+), 43 deletions(-) diff --git a/ee/app/workers/all_queues.yml b/ee/app/workers/all_queues.yml index f388038df881d4..e7f4791f294b58 100644 --- a/ee/app/workers/all_queues.yml +++ b/ee/app/workers/all_queues.yml @@ -4128,7 +4128,7 @@ :queue_namespace: - :name: security_scan_profiles_attach :worker_name: Security::ScanProfiles::AttachWorker - :feature_category: :security_policy_management + :feature_category: :security_asset_inventories :has_external_dependencies: false :urgency: :low :resource_boundary: :unknown diff --git a/ee/app/workers/security/scan_profiles/attach_worker.rb b/ee/app/workers/security/scan_profiles/attach_worker.rb index 268060055b1a44..232841d48a1ea5 100644 --- a/ee/app/workers/security/scan_profiles/attach_worker.rb +++ b/ee/app/workers/security/scan_profiles/attach_worker.rb @@ -11,7 +11,7 @@ class AttachWorker concurrency_limit -> { 200 } - feature_category :security_policy_management + feature_category :security_asset_inventories BATCH_SIZE = 500 diff --git a/ee/spec/workers/security/scan_profiles/attach_worker_spec.rb b/ee/spec/workers/security/scan_profiles/attach_worker_spec.rb index 36c6398ac43021..80ae4ada1f04a0 100644 --- a/ee/spec/workers/security/scan_profiles/attach_worker_spec.rb +++ b/ee/spec/workers/security/scan_profiles/attach_worker_spec.rb @@ -2,7 +2,7 @@ require 'spec_helper' -RSpec.describe Security::ScanProfiles::AttachWorker, feature_category: :security_policy_management do +RSpec.describe Security::ScanProfiles::AttachWorker, feature_category: :security_asset_inventories do let_it_be(:root_group) { create(:group) } let_it_be(:subgroup) { create(:group, parent: root_group) } let_it_be(:nested_subgroup) { create(:group, parent: subgroup) } @@ -21,6 +21,17 @@ subject(:worker) { described_class.new } + shared_examples 'exits gracefully without creating records' do + it 'does not create records' do + expect { worker.perform(group_id, scan_profile_id) } + .not_to change { Security::ScanProfileProject.count } + end + + it 'does not raise an error' do + expect { worker.perform(group_id, scan_profile_id) }.not_to raise_error + end + end + describe '#perform' do it 'attaches the scan profile to all projects in the group hierarchy' do expect { worker.perform(group_id, scan_profile_id) } @@ -71,27 +82,13 @@ context 'when group does not exist' do let(:group_id) { non_existing_record_id } - it 'exits gracefully without creating records' do - expect { worker.perform(group_id, scan_profile_id) } - .not_to change { Security::ScanProfileProject.count } - end - - it 'does not raise an error' do - expect { worker.perform(group_id, scan_profile_id) }.not_to raise_error - end + it_behaves_like 'exits gracefully without creating records' end context 'when scan profile does not exist' do let(:scan_profile_id) { non_existing_record_id } - it 'exits gracefully without creating records' do - expect { worker.perform(group_id, scan_profile_id) } - .not_to change { Security::ScanProfileProject.count } - end - - it 'does not raise an error' do - expect { worker.perform(group_id, scan_profile_id) }.not_to raise_error - end + it_behaves_like 'exits gracefully without creating records' end context 'when scan profile belongs to a different root namespace' do @@ -102,28 +99,14 @@ let(:scan_profile_id) { other_scan_profile.id } - it 'exits gracefully without creating records' do - expect { worker.perform(group_id, scan_profile_id) } - .not_to change { Security::ScanProfileProject.count } - end - - it 'does not raise an error' do - expect { worker.perform(group_id, scan_profile_id) }.not_to raise_error - end + it_behaves_like 'exits gracefully without creating records' end context 'when the group has no projects' do let_it_be(:empty_group) { create(:group, parent: root_group) } let(:group) { empty_group } - it 'completes without errors' do - expect { worker.perform(group_id, scan_profile_id) } - .not_to change { Security::ScanProfileProject.count } - end - - it 'does not raise an error' do - expect { worker.perform(group_id, scan_profile_id) }.not_to raise_error - end + it_behaves_like 'exits gracefully without creating records' end context 'when processing large number of projects' do @@ -147,14 +130,6 @@ worker.perform(group_id, scan_profile_id) end - - it 'uses NamespaceProjectIdsEachBatch to collect project IDs' do - expect(Gitlab::Database::NamespaceProjectIdsEachBatch).to receive(:new) - .with(group_id: group.id, batch_size: 500) - .and_call_original - - worker.perform(group_id, scan_profile_id) - end end end end -- GitLab From be2aee7e97d4eff69ef086163a28a0f24e42249d Mon Sep 17 00:00:00 2001 From: Nicolae Rotaru Date: Mon, 8 Dec 2025 09:27:39 +0100 Subject: [PATCH 03/10] Add missing specs --- .../scan_profiles/attach_worker_spec.rb | 31 +++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/ee/spec/workers/security/scan_profiles/attach_worker_spec.rb b/ee/spec/workers/security/scan_profiles/attach_worker_spec.rb index 80ae4ada1f04a0..bc342e2f9d05fb 100644 --- a/ee/spec/workers/security/scan_profiles/attach_worker_spec.rb +++ b/ee/spec/workers/security/scan_profiles/attach_worker_spec.rb @@ -131,5 +131,36 @@ worker.perform(group_id, scan_profile_id) end end + + context 'when an error occurs during bulk insert' do + before do + allow_next_instance_of(Gitlab::Database::NamespaceProjectIdsEachBatch) do |instance| + allow(instance).to receive(:execute).and_raise(StandardError, 'Database connection error') + end + end + + it 'tracks the exception with context' do + expect(Gitlab::ErrorTracking).to receive(:track_exception) + .with( + an_instance_of(StandardError), + { + group_id: group.id, + scan_profile_id: scan_profile.id, + project_ids_count: nil + } + ) + + worker.perform(group_id, scan_profile_id) + end + + it 'does not raise an error' do + expect { worker.perform(group_id, scan_profile_id) }.not_to raise_error + end + + it 'does not create any records' do + expect { worker.perform(group_id, scan_profile_id) } + .not_to change { Security::ScanProfileProject.count } + end + end end end -- GitLab From 87183917b3a3e10e88c0a046f47a95a84c93ee89 Mon Sep 17 00:00:00 2001 From: Nicolae Rotaru Date: Mon, 8 Dec 2025 14:42:05 +0100 Subject: [PATCH 04/10] Add missing specs --- .../scan_profiles/attach_worker_spec.rb | 25 +++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/ee/spec/workers/security/scan_profiles/attach_worker_spec.rb b/ee/spec/workers/security/scan_profiles/attach_worker_spec.rb index bc342e2f9d05fb..f174f821e602c4 100644 --- a/ee/spec/workers/security/scan_profiles/attach_worker_spec.rb +++ b/ee/spec/workers/security/scan_profiles/attach_worker_spec.rb @@ -162,5 +162,30 @@ .not_to change { Security::ScanProfileProject.count } end end + + context 'when an error occurs before project_ids is assigned' do + before do + allow(Gitlab::Database::NamespaceProjectIdsEachBatch).to receive(:new) + .and_raise(StandardError, 'Initialization error') + end + + it 'tracks the exception with nil project_ids_count' do + expect(Gitlab::ErrorTracking).to receive(:track_exception) + .with( + an_instance_of(StandardError), + { + group_id: group.id, + scan_profile_id: scan_profile.id, + project_ids_count: nil + } + ) + + worker.perform(group_id, scan_profile_id) + end + + it 'does not raise an error' do + expect { worker.perform(group_id, scan_profile_id) }.not_to raise_error + end + end end end -- GitLab From d6e78bf4573741f7c63263b57398bfebaace78da Mon Sep 17 00:00:00 2001 From: Nicolae Rotaru Date: Tue, 9 Dec 2025 10:07:41 +0100 Subject: [PATCH 05/10] Address the feedback: refactor --- .../security/scan_profiles/attach_service.rb | 78 +++++++ .../security/scan_profiles/attach_worker.rb | 44 +--- .../scan_profiles/attach_service_spec.rb | 194 ++++++++++++++++++ .../scan_profiles/attach_worker_spec.rb | 144 ++----------- 4 files changed, 285 insertions(+), 175 deletions(-) create mode 100644 ee/app/services/security/scan_profiles/attach_service.rb create mode 100644 ee/spec/services/security/scan_profiles/attach_service_spec.rb diff --git a/ee/app/services/security/scan_profiles/attach_service.rb b/ee/app/services/security/scan_profiles/attach_service.rb new file mode 100644 index 00000000000000..5687d6fe2f0d35 --- /dev/null +++ b/ee/app/services/security/scan_profiles/attach_service.rb @@ -0,0 +1,78 @@ +# frozen_string_literal: true + +module Security + module ScanProfiles + class AttachService + include BaseServiceUtility + + BATCH_SIZE = 500 + + def initialize(group, scan_profile) + @group = group + @scan_profile = scan_profile + end + + def execute + return error('Group not found') unless group + return error('Scan profile not found') unless scan_profile + return error('Scan profile does not belong to group hierarchy') unless valid_namespace? + + attach_profile_to_projects + success + rescue StandardError => e + Gitlab::ErrorTracking.track_exception(e, { + group_id: group.id, + scan_profile_id: scan_profile.id + }) + error('Failed to attach scan profile to projects') + end + + private + + attr_reader :group, :scan_profile + + def valid_namespace? + scan_profile.namespace_id == group.root_ancestor.id + end + + def attach_profile_to_projects + project_ids = fetch_project_ids + + return if project_ids.empty? + + bulk_insert_profile_projects(project_ids) + end + + def fetch_project_ids + Gitlab::Database::NamespaceProjectIdsEachBatch.new( + group_id: group.id, + batch_size: BATCH_SIZE + ).execute + end + + def bulk_insert_profile_projects(project_ids) + timestamp = Time.current + + # Sort project IDs to ensure consistent lock ordering and prevent deadlocks + # when multiple workers process the same or overlapping groups concurrently + sorted_project_ids = project_ids.sort + + sorted_project_ids.each_slice(BATCH_SIZE) do |batch_ids| + attributes = batch_ids.map do |project_id| + { + project_id: project_id, + security_scan_profile_id: scan_profile.id, + created_at: timestamp, + updated_at: timestamp + } + end + + Security::ScanProfileProject.insert_all( + attributes, + unique_by: [:project_id, :security_scan_profile_id] + ) + end + end + end + end +end diff --git a/ee/app/workers/security/scan_profiles/attach_worker.rb b/ee/app/workers/security/scan_profiles/attach_worker.rb index 232841d48a1ea5..b7d865fa5f9c92 100644 --- a/ee/app/workers/security/scan_profiles/attach_worker.rb +++ b/ee/app/workers/security/scan_profiles/attach_worker.rb @@ -13,56 +13,14 @@ class AttachWorker feature_category :security_asset_inventories - BATCH_SIZE = 500 - def perform(group_id, scan_profile_id) group = Group.find_by_id(group_id) return unless group scan_profile = Security::ScanProfile.find_by_id(scan_profile_id) return unless scan_profile - return unless scan_profile.namespace_id == group.root_ancestor.id - - attach_profile_to_projects(group, scan_profile) - end - - private - - def attach_profile_to_projects(group, scan_profile) - project_ids = Gitlab::Database::NamespaceProjectIdsEachBatch.new( - group_id: group.id, - batch_size: BATCH_SIZE - ).execute - - return if project_ids.empty? - - bulk_insert_profile_projects(project_ids, scan_profile) - rescue StandardError => e - Gitlab::ErrorTracking.track_exception(e, { - group_id: group.id, - scan_profile_id: scan_profile.id, - project_ids_count: project_ids&.size - }) - end - - def bulk_insert_profile_projects(project_ids, scan_profile) - timestamp = Time.current - - project_ids.each_slice(BATCH_SIZE) do |batch_ids| - attributes = batch_ids.map do |project_id| - { - project_id: project_id, - security_scan_profile_id: scan_profile.id, - created_at: timestamp, - updated_at: timestamp - } - end - Security::ScanProfileProject.insert_all( - attributes, - unique_by: [:project_id, :security_scan_profile_id] - ) - end + Security::ScanProfiles::AttachService.new(group, scan_profile).execute end end end diff --git a/ee/spec/services/security/scan_profiles/attach_service_spec.rb b/ee/spec/services/security/scan_profiles/attach_service_spec.rb new file mode 100644 index 00000000000000..1fd7762f256eef --- /dev/null +++ b/ee/spec/services/security/scan_profiles/attach_service_spec.rb @@ -0,0 +1,194 @@ +# frozen_string_literal: true + +require 'spec_helper' + +RSpec.describe Security::ScanProfiles::AttachService, feature_category: :security_asset_inventories do + let_it_be(:root_group) { create(:group) } + let_it_be(:subgroup) { create(:group, parent: root_group) } + let_it_be(:nested_subgroup) { create(:group, parent: subgroup) } + + let_it_be(:project1) { create(:project, namespace: root_group) } + let_it_be(:project2) { create(:project, namespace: subgroup) } + let_it_be(:project3) { create(:project, namespace: nested_subgroup) } + + let_it_be(:scan_profile) do + create(:security_scan_profile, namespace: root_group, scan_type: :secret_detection) + end + + let(:group) { root_group } + let(:service) { described_class.new(group, scan_profile) } + + describe '#execute' do + it 'attaches the scan profile to all projects in the group hierarchy' do + expect { service.execute } + .to change { Security::ScanProfileProject.count }.by(3) + + expect(Security::ScanProfileProject.exists?(project: project1, security_scan_profile_id: scan_profile.id)) + .to be(true) + expect(Security::ScanProfileProject.exists?(project: project2, security_scan_profile_id: scan_profile.id)) + .to be(true) + expect(Security::ScanProfileProject.exists?(project: project3, security_scan_profile_id: scan_profile.id)) + .to be(true) + end + + it 'returns a success response' do + result = service.execute + + expect(result[:status]).to eq(:success) + end + + context 'when called for a subgroup' do + let(:group) { subgroup } + + it 'attaches the scan profile to projects in the subgroup and its descendants' do + expect { service.execute } + .to change { Security::ScanProfileProject.count }.by(2) + + expect(Security::ScanProfileProject.exists?(project: project1, security_scan_profile_id: scan_profile.id)) + .to be(false) + expect(Security::ScanProfileProject.exists?(project: project2, security_scan_profile_id: scan_profile.id)) + .to be(true) + expect(Security::ScanProfileProject.exists?(project: project3, security_scan_profile_id: scan_profile.id)) + .to be(true) + end + end + + context 'when the profile is already attached to some projects' do + before do + create(:security_scan_profile_project, project: project1, scan_profile: scan_profile) + end + + it 'is idempotent and does not create duplicate records' do + expect { service.execute } + .to change { Security::ScanProfileProject.count }.by(2) + + # Running again should not create duplicates + expect { service.execute } + .not_to change { Security::ScanProfileProject.count } + + expect(Security::ScanProfileProject.where(project: project1, security_scan_profile_id: scan_profile.id).count) + .to eq(1) + end + end + + context 'when group is nil' do + let(:group) { nil } + + it 'returns an error response' do + result = service.execute + + expect(result[:status]).to eq(:error) + expect(result[:message]).to eq('Group not found') + end + + it 'does not create any records' do + expect { service.execute } + .not_to change { Security::ScanProfileProject.count } + end + end + + context 'when scan profile is nil' do + let(:scan_profile) { nil } + + it 'returns an error response' do + result = described_class.new(group, nil).execute + + expect(result[:status]).to eq(:error) + expect(result[:message]).to eq('Scan profile not found') + end + + it 'does not create any records' do + expect { described_class.new(group, nil).execute } + .not_to change { Security::ScanProfileProject.count } + end + end + + context 'when scan profile belongs to a different root namespace' do + let_it_be(:other_root_group) { create(:group) } + let_it_be(:other_scan_profile) do + create(:security_scan_profile, namespace: other_root_group, scan_type: :secret_detection) + end + + let(:scan_profile) { other_scan_profile } + + it 'returns an error response' do + result = service.execute + + expect(result[:status]).to eq(:error) + expect(result[:message]).to eq('Scan profile does not belong to group hierarchy') + end + + it 'does not create any records' do + expect { service.execute } + .not_to change { Security::ScanProfileProject.count } + end + end + + context 'when the group has no projects' do + let_it_be(:empty_group) { create(:group, parent: root_group) } + let(:group) { empty_group } + + it 'returns a success response' do + result = service.execute + + expect(result[:status]).to eq(:success) + end + + it 'does not create any records' do + expect { service.execute } + .not_to change { Security::ScanProfileProject.count } + end + end + + context 'when an error occurs during attachment' do + before do + allow_next_instance_of(Gitlab::Database::NamespaceProjectIdsEachBatch) do |instance| + allow(instance).to receive(:execute).and_raise(StandardError, 'Database error') + end + end + + it 'tracks the exception with context' do + expect(Gitlab::ErrorTracking).to receive(:track_exception) + .with( + an_instance_of(StandardError), + { + group_id: group.id, + scan_profile_id: scan_profile.id + } + ) + + service.execute + end + + it 'returns an error response' do + result = service.execute + + expect(result[:status]).to eq(:error) + expect(result[:message]).to eq('Failed to attach scan profile to projects') + end + + it 'does not create any records' do + expect { service.execute } + .not_to change { Security::ScanProfileProject.count } + end + end + + context 'when project IDs are sorted to prevent deadlocks' do + it 'sorts project IDs before inserting' do + # Mock unsorted project IDs + unsorted_ids = [project3.id, project1.id, project2.id] + allow_next_instance_of(Gitlab::Database::NamespaceProjectIdsEachBatch) do |instance| + allow(instance).to receive(:execute).and_return(unsorted_ids) + end + + # Expect insert_all to be called with sorted IDs + expect(Security::ScanProfileProject).to receive(:insert_all) do |attributes, _options| + project_ids = attributes.pluck(:project_id) + expect(project_ids).to eq(unsorted_ids.sort) + end + + service.execute + end + end + end +end diff --git a/ee/spec/workers/security/scan_profiles/attach_worker_spec.rb b/ee/spec/workers/security/scan_profiles/attach_worker_spec.rb index f174f821e602c4..6480ba8594a9e0 100644 --- a/ee/spec/workers/security/scan_profiles/attach_worker_spec.rb +++ b/ee/spec/workers/security/scan_profiles/attach_worker_spec.rb @@ -21,18 +21,15 @@ subject(:worker) { described_class.new } - shared_examples 'exits gracefully without creating records' do - it 'does not create records' do - expect { worker.perform(group_id, scan_profile_id) } - .not_to change { Security::ScanProfileProject.count } - end + describe '#perform' do + it 'delegates to the AttachService' do + expect_next_instance_of(Security::ScanProfiles::AttachService, group, scan_profile) do |service| + expect(service).to receive(:execute).and_call_original + end - it 'does not raise an error' do - expect { worker.perform(group_id, scan_profile_id) }.not_to raise_error + worker.perform(group_id, scan_profile_id) end - end - describe '#perform' do it 'attaches the scan profile to all projects in the group hierarchy' do expect { worker.perform(group_id, scan_profile_id) } .to change { Security::ScanProfileProject.count }.by(3) @@ -45,115 +42,23 @@ .to be(true) end - context 'when called for a subgroup' do - let(:group) { subgroup } - - it 'attaches the scan profile to projects in the subgroup and its descendants' do - expect { worker.perform(group_id, scan_profile_id) } - .to change { Security::ScanProfileProject.count }.by(2) - - expect(Security::ScanProfileProject.exists?(project: project1, security_scan_profile_id: scan_profile.id)) - .to be(false) - expect(Security::ScanProfileProject.exists?(project: project2, security_scan_profile_id: scan_profile.id)) - .to be(true) - expect(Security::ScanProfileProject.exists?(project: project3, security_scan_profile_id: scan_profile.id)) - .to be(true) - end - end + context 'when group does not exist' do + let(:group_id) { non_existing_record_id } - context 'when the profile is already attached to some projects' do - before do - create(:security_scan_profile_project, project: project1, scan_profile: scan_profile) + it 'exits gracefully without raising an error' do + expect { worker.perform(group_id, scan_profile_id) }.not_to raise_error end - it 'is idempotent and does not create duplicate records' do - expect { worker.perform(group_id, scan_profile_id) } - .to change { Security::ScanProfileProject.count }.by(2) - - # Running again should not create duplicates + it 'does not create any records' do expect { worker.perform(group_id, scan_profile_id) } .not_to change { Security::ScanProfileProject.count } - - expect(Security::ScanProfileProject.where(project: project1, security_scan_profile_id: scan_profile.id).count) - .to eq(1) end end - context 'when group does not exist' do - let(:group_id) { non_existing_record_id } - - it_behaves_like 'exits gracefully without creating records' - end - context 'when scan profile does not exist' do let(:scan_profile_id) { non_existing_record_id } - it_behaves_like 'exits gracefully without creating records' - end - - context 'when scan profile belongs to a different root namespace' do - let_it_be(:other_root_group) { create(:group) } - let_it_be(:other_scan_profile) do - create(:security_scan_profile, namespace: other_root_group, scan_type: :secret_detection) - end - - let(:scan_profile_id) { other_scan_profile.id } - - it_behaves_like 'exits gracefully without creating records' - end - - context 'when the group has no projects' do - let_it_be(:empty_group) { create(:group, parent: root_group) } - let(:group) { empty_group } - - it_behaves_like 'exits gracefully without creating records' - end - - context 'when processing large number of projects' do - let_it_be(:large_group) { create(:group) } - let_it_be(:large_scan_profile) do - create(:security_scan_profile, namespace: large_group, scan_type: :secret_detection) - end - - let(:group) { large_group } - let(:scan_profile_id) { large_scan_profile.id } - - it 'uses insert_all in batches' do - # Mock the project IDs to simulate 1200 projects without actually creating them - project_ids = (1..1200).to_a - batch_executor = instance_double(Gitlab::Database::NamespaceProjectIdsEachBatch, execute: project_ids) - allow(Gitlab::Database::NamespaceProjectIdsEachBatch).to receive(:new) - .and_return(batch_executor) - - # Should call insert_all 3 times (1200 / 500 = 2.4, rounded up to 3) - expect(Security::ScanProfileProject).to receive(:insert_all).exactly(3).times.and_call_original - - worker.perform(group_id, scan_profile_id) - end - end - - context 'when an error occurs during bulk insert' do - before do - allow_next_instance_of(Gitlab::Database::NamespaceProjectIdsEachBatch) do |instance| - allow(instance).to receive(:execute).and_raise(StandardError, 'Database connection error') - end - end - - it 'tracks the exception with context' do - expect(Gitlab::ErrorTracking).to receive(:track_exception) - .with( - an_instance_of(StandardError), - { - group_id: group.id, - scan_profile_id: scan_profile.id, - project_ids_count: nil - } - ) - - worker.perform(group_id, scan_profile_id) - end - - it 'does not raise an error' do + it 'exits gracefully without raising an error' do expect { worker.perform(group_id, scan_profile_id) }.not_to raise_error end @@ -162,30 +67,5 @@ .not_to change { Security::ScanProfileProject.count } end end - - context 'when an error occurs before project_ids is assigned' do - before do - allow(Gitlab::Database::NamespaceProjectIdsEachBatch).to receive(:new) - .and_raise(StandardError, 'Initialization error') - end - - it 'tracks the exception with nil project_ids_count' do - expect(Gitlab::ErrorTracking).to receive(:track_exception) - .with( - an_instance_of(StandardError), - { - group_id: group.id, - scan_profile_id: scan_profile.id, - project_ids_count: nil - } - ) - - worker.perform(group_id, scan_profile_id) - end - - it 'does not raise an error' do - expect { worker.perform(group_id, scan_profile_id) }.not_to raise_error - end - end end end -- GitLab From 6fcf56d200ae4902dbd043fcd0da8e7c7ad67260 Mon Sep 17 00:00:00 2001 From: Nicolae Rotaru Date: Tue, 9 Dec 2025 17:09:48 +0100 Subject: [PATCH 06/10] Implement exclusive lease --- .../security/scan_profiles/attach_service.rb | 46 +++++- .../process_project_transfer_events_worker.rb | 22 ++- .../scan_profiles/attach_service_spec.rb | 141 ++++++++++++++++-- ...ess_project_transfer_events_worker_spec.rb | 57 +++++++ 4 files changed, 245 insertions(+), 21 deletions(-) diff --git a/ee/app/services/security/scan_profiles/attach_service.rb b/ee/app/services/security/scan_profiles/attach_service.rb index 5687d6fe2f0d35..36d6015c6a3649 100644 --- a/ee/app/services/security/scan_profiles/attach_service.rb +++ b/ee/app/services/security/scan_profiles/attach_service.rb @@ -6,6 +6,8 @@ class AttachService include BaseServiceUtility BATCH_SIZE = 500 + LEASE_TIMEOUT = 5.minutes + RETRY_DELAY = 30.seconds def initialize(group, scan_profile) @group = group @@ -36,18 +38,46 @@ def valid_namespace? end def attach_profile_to_projects - project_ids = fetch_project_ids + namespace_ids = fetch_namespace_hierarchy_ids - return if project_ids.empty? + namespace_ids.each do |namespace_id| + process_namespace_with_lock(namespace_id) + end + end + + def fetch_namespace_hierarchy_ids + cursor = { current_id: group.id, depth: [group.id] } + iterator = Gitlab::Database::NamespaceEachBatch.new(namespace_class: Group, cursor: cursor) - bulk_insert_profile_projects(project_ids) + namespace_ids = [] + iterator.each_batch(of: BATCH_SIZE) { |ids| namespace_ids.concat(ids) } + namespace_ids end - def fetch_project_ids - Gitlab::Database::NamespaceProjectIdsEachBatch.new( - group_id: group.id, - batch_size: BATCH_SIZE - ).execute + def process_namespace_with_lock(namespace_id) + lease_key = "attach_scan_profile:namespace:#{namespace_id}" + lease = Gitlab::ExclusiveLease.new(lease_key, timeout: LEASE_TIMEOUT.to_i) + + uuid = lease.try_obtain + + unless uuid + # Reschedule to retry later if we can't obtain the lock + Security::ScanProfiles::AttachWorker.perform_in(RETRY_DELAY, group.id, scan_profile.id) + return + end + + begin + project_ids = fetch_projects_for_namespace(namespace_id) + bulk_insert_profile_projects(project_ids) if project_ids.any? + ensure + Gitlab::ExclusiveLease.cancel(lease_key, uuid) + end + end + + def fetch_projects_for_namespace(namespace_id) + # Fetch projects directly for this specific namespace only + # This is called while holding a lock on the namespace + Project.in_namespace(namespace_id).pluck_primary_key end def bulk_insert_profile_projects(project_ids) diff --git a/ee/app/workers/security/scan_profiles/process_project_transfer_events_worker.rb b/ee/app/workers/security/scan_profiles/process_project_transfer_events_worker.rb index 04bf12caf18283..4b9776ba4915cd 100644 --- a/ee/app/workers/security/scan_profiles/process_project_transfer_events_worker.rb +++ b/ee/app/workers/security/scan_profiles/process_project_transfer_events_worker.rb @@ -9,17 +9,37 @@ class ProcessProjectTransferEventsWorker data_consistency :sticky feature_category :security_asset_inventories + LEASE_TIMEOUT = 5.minutes + def handle_event(event) return unless event.data[:old_root_namespace_id] != event.data[:new_root_namespace_id] project = Project.find_by_id(event.data[:project_id]) return unless project - remove_old_namespace_scan_profile_associations(project) + # Acquire lock on the old namespace to synchronize with attach operations + old_namespace_id = event.data[:old_namespace_id] + process_with_lock(old_namespace_id) do + remove_old_namespace_scan_profile_associations(project) + end end private + def process_with_lock(namespace_id) + lease_key = "attach_scan_profile:namespace:#{namespace_id}" + lease = Gitlab::ExclusiveLease.new(lease_key, timeout: LEASE_TIMEOUT.to_i) + + uuid = lease.try_obtain + return unless uuid + + begin + yield + ensure + Gitlab::ExclusiveLease.cancel(lease_key, uuid) + end + end + def remove_old_namespace_scan_profile_associations(project) project.security_scan_profiles_projects.not_in_root_namespace(project.group&.root_ancestor).delete_all end diff --git a/ee/spec/services/security/scan_profiles/attach_service_spec.rb b/ee/spec/services/security/scan_profiles/attach_service_spec.rb index 1fd7762f256eef..ca4890e8f19e06 100644 --- a/ee/spec/services/security/scan_profiles/attach_service_spec.rb +++ b/ee/spec/services/security/scan_profiles/attach_service_spec.rb @@ -3,6 +3,8 @@ require 'spec_helper' RSpec.describe Security::ScanProfiles::AttachService, feature_category: :security_asset_inventories do + include ExclusiveLeaseHelpers + let_it_be(:root_group) { create(:group) } let_it_be(:subgroup) { create(:group, parent: root_group) } let_it_be(:nested_subgroup) { create(:group, parent: subgroup) } @@ -37,6 +39,32 @@ expect(result[:status]).to eq(:success) end + it 'acquires exclusive lease for each namespace before processing' do + namespace_ids = [root_group.id, subgroup.id, nested_subgroup.id] + + namespace_ids.each do |namespace_id| + lease_key = "attach_scan_profile:namespace:#{namespace_id}" + expect(Gitlab::ExclusiveLease).to receive(:new) + .with(lease_key, timeout: described_class::LEASE_TIMEOUT.to_i) + .and_call_original + end + + service.execute + end + + it 'releases the lease after processing each namespace' do + namespace_ids = [root_group.id, subgroup.id, nested_subgroup.id] + + namespace_ids.each do |namespace_id| + lease_key = "attach_scan_profile:namespace:#{namespace_id}" + expect(Gitlab::ExclusiveLease).to receive(:cancel) + .with(lease_key, anything) + .and_call_original + end + + service.execute + end + context 'when called for a subgroup' do let(:group) { subgroup } @@ -140,10 +168,105 @@ end end + context 'when exclusive lease cannot be obtained' do + before do + # Stub all namespaces in the hierarchy to simulate all locks being taken + [root_group.id, subgroup.id, nested_subgroup.id].each do |namespace_id| + lease_key = "attach_scan_profile:namespace:#{namespace_id}" + stub_exclusive_lease_taken(lease_key) + end + end + + it 'reschedules the worker to retry later' do + expect(Security::ScanProfiles::AttachWorker).to receive(:perform_in) + .with(described_class::RETRY_DELAY, group.id, scan_profile.id) + .at_least(:once) + + service.execute + end + + it 'does not process any namespace' do + allow(Security::ScanProfiles::AttachWorker).to receive(:perform_in) + + expect(Security::ScanProfileProject).not_to receive(:insert_all) + + service.execute + end + end + + context 'when lock is obtained for some namespaces but not others' do + before do + # Lock only the middle namespace (subgroup) + stub_exclusive_lease_taken("attach_scan_profile:namespace:#{subgroup.id}") + end + + it 'processes unlocked namespaces' do + allow(Security::ScanProfiles::AttachWorker).to receive(:perform_in) + + expect { service.execute } + .to change { Security::ScanProfileProject.count }.by(2) # root + nested only + + # Verify which projects were processed + expect(Security::ScanProfileProject.exists?(project: project1, security_scan_profile_id: scan_profile.id)) + .to be(true) # root_group processed + expect(Security::ScanProfileProject.exists?(project: project2, security_scan_profile_id: scan_profile.id)) + .to be(false) # subgroup locked, not processed + expect(Security::ScanProfileProject.exists?(project: project3, security_scan_profile_id: scan_profile.id)) + .to be(true) # nested_subgroup processed + end + + it 'reschedules worker for locked namespace' do + expect(Security::ScanProfiles::AttachWorker).to receive(:perform_in) + .with(described_class::RETRY_DELAY, group.id, scan_profile.id) + .once + + service.execute + end + + it 'still returns success' do + allow(Security::ScanProfiles::AttachWorker).to receive(:perform_in) + + result = service.execute + expect(result[:status]).to eq(:success) + end + end + + context 'when error occurs after obtaining lock' do + before do + # Simulate error during project fetching after lock is obtained + allow(Project).to receive(:in_namespace).and_raise(StandardError, 'Query error') + end + + it 'releases the lock even when error occurs' do + lease_key = "attach_scan_profile:namespace:#{root_group.id}" + + expect(Gitlab::ExclusiveLease).to receive(:cancel) + .with(lease_key, anything) + .and_call_original + + service.execute + end + + it 'tracks the exception and returns error response' do + expect(Gitlab::ErrorTracking).to receive(:track_exception) + .with( + an_instance_of(StandardError), + { + group_id: group.id, + scan_profile_id: scan_profile.id + } + ) + + result = service.execute + expect(result[:status]).to eq(:error) + expect(result[:message]).to eq('Failed to attach scan profile to projects') + end + end + context 'when an error occurs during attachment' do before do - allow_next_instance_of(Gitlab::Database::NamespaceProjectIdsEachBatch) do |instance| - allow(instance).to receive(:execute).and_raise(StandardError, 'Database error') + allow_next_instance_of(Gitlab::Database::NamespaceEachBatch) do |instance| + allow(instance).to receive(:each_batch).and_raise(StandardError, 'Database error') end end @@ -175,17 +298,11 @@ context 'when project IDs are sorted to prevent deadlocks' do it 'sorts project IDs before inserting' do - # Mock unsorted project IDs - unsorted_ids = [project3.id, project1.id, project2.id] - allow_next_instance_of(Gitlab::Database::NamespaceProjectIdsEachBatch) do |instance| - allow(instance).to receive(:execute).and_return(unsorted_ids) - end - - # Expect insert_all to be called with sorted IDs - expect(Security::ScanProfileProject).to receive(:insert_all) do |attributes, _options| + # Expect insert_all to be called with sorted IDs for each namespace + allow(Security::ScanProfileProject).to receive(:insert_all) do |attributes, _options| project_ids = attributes.pluck(:project_id) - expect(project_ids).to eq(unsorted_ids.sort) - end + expect(project_ids).to eq(project_ids.sort) + end.at_least(:once) service.execute end diff --git a/ee/spec/workers/security/scan_profiles/process_project_transfer_events_worker_spec.rb b/ee/spec/workers/security/scan_profiles/process_project_transfer_events_worker_spec.rb index 877b072e955329..1ba1f637bbf656 100644 --- a/ee/spec/workers/security/scan_profiles/process_project_transfer_events_worker_spec.rb +++ b/ee/spec/workers/security/scan_profiles/process_project_transfer_events_worker_spec.rb @@ -3,6 +3,8 @@ require 'spec_helper' RSpec.describe Security::ScanProfiles::ProcessProjectTransferEventsWorker, feature_category: :security_asset_inventories do + include ExclusiveLeaseHelpers + let_it_be(:old_namespace) { create(:group) } let_it_be(:new_namespace) { create(:group) } @@ -39,6 +41,61 @@ subject(:handle_event) { worker.handle_event(project_event) } describe '#handle_event' do + it 'acquires exclusive lease before processing' do + project.update!(namespace: new_namespace) + + lease_key = "attach_scan_profile:namespace:#{old_namespace.id}" + expect(Gitlab::ExclusiveLease).to receive(:new) + .with(lease_key, timeout: described_class::LEASE_TIMEOUT.to_i) + .and_call_original + + handle_event + end + + it 'releases the lease after processing' do + project.update!(namespace: new_namespace) + + lease_key = "attach_scan_profile:namespace:#{old_namespace.id}" + expect(Gitlab::ExclusiveLease).to receive(:cancel) + .with(lease_key, anything) + .and_call_original + + handle_event + end + + context 'when exclusive lease cannot be obtained' do + before do + project.update!(namespace: new_namespace) + lease_key = "attach_scan_profile:namespace:#{old_namespace.id}" + stub_exclusive_lease_taken(lease_key) + end + + it 'does not process the transfer' do + expect { handle_event }.not_to change { Security::ScanProfileProject.count } + end + end + + context 'when error occurs during deletion after lock is obtained' do + before do + project.update!(namespace: new_namespace) + + # Simulate error during deletion + allow_next_found_instance_of(Project) do |instance| + allow(instance).to receive(:security_scan_profiles_projects).and_raise(StandardError, 'Database error') + end + end + + it 'releases the lock even when error occurs' do + lease_key = "attach_scan_profile:namespace:#{old_namespace.id}" + + expect(Gitlab::ExclusiveLease).to receive(:cancel) + .with(lease_key, anything) + .and_call_original + + expect { handle_event }.to raise_error(StandardError, 'Database error') + end + end + context 'when there is no project associated with the event' do let(:project_id) { non_existing_record_id } -- GitLab From c31351d79001cf8e392ad70182271167b048f03b Mon Sep 17 00:00:00 2001 From: Nicolae Rotaru Date: Thu, 11 Dec 2025 09:24:32 +0100 Subject: [PATCH 07/10] Use Gitlab::ExclusiveLeaseHelpers with in_lock --- .../security/scan_profiles/attach_service.rb | 16 ++------ .../process_project_transfer_events_worker.rb | 12 +++--- .../scan_profiles/attach_service_spec.rb | 40 +++++-------------- ...ess_project_transfer_events_worker_spec.rb | 38 ++---------------- 4 files changed, 21 insertions(+), 85 deletions(-) diff --git a/ee/app/services/security/scan_profiles/attach_service.rb b/ee/app/services/security/scan_profiles/attach_service.rb index 36d6015c6a3649..d526aef6498953 100644 --- a/ee/app/services/security/scan_profiles/attach_service.rb +++ b/ee/app/services/security/scan_profiles/attach_service.rb @@ -4,6 +4,7 @@ module Security module ScanProfiles class AttachService include BaseServiceUtility + include Gitlab::ExclusiveLeaseHelpers BATCH_SIZE = 500 LEASE_TIMEOUT = 5.minutes @@ -56,22 +57,13 @@ def fetch_namespace_hierarchy_ids def process_namespace_with_lock(namespace_id) lease_key = "attach_scan_profile:namespace:#{namespace_id}" - lease = Gitlab::ExclusiveLease.new(lease_key, timeout: LEASE_TIMEOUT.to_i) - uuid = lease.try_obtain - - unless uuid - # Reschedule to retry later if we can't obtain the lock - Security::ScanProfiles::AttachWorker.perform_in(RETRY_DELAY, group.id, scan_profile.id) - return - end - - begin + in_lock(lease_key, ttl: LEASE_TIMEOUT, retries: 0) do project_ids = fetch_projects_for_namespace(namespace_id) bulk_insert_profile_projects(project_ids) if project_ids.any? - ensure - Gitlab::ExclusiveLease.cancel(lease_key, uuid) end + rescue FailedToObtainLockError + Security::ScanProfiles::AttachWorker.perform_in(RETRY_DELAY, group.id, scan_profile.id) end def fetch_projects_for_namespace(namespace_id) diff --git a/ee/app/workers/security/scan_profiles/process_project_transfer_events_worker.rb b/ee/app/workers/security/scan_profiles/process_project_transfer_events_worker.rb index 4b9776ba4915cd..18d1ef8a77fefb 100644 --- a/ee/app/workers/security/scan_profiles/process_project_transfer_events_worker.rb +++ b/ee/app/workers/security/scan_profiles/process_project_transfer_events_worker.rb @@ -4,6 +4,7 @@ module Security module ScanProfiles class ProcessProjectTransferEventsWorker include Gitlab::EventStore::Subscriber + include Gitlab::ExclusiveLeaseHelpers idempotent! data_consistency :sticky @@ -28,16 +29,13 @@ def handle_event(event) def process_with_lock(namespace_id) lease_key = "attach_scan_profile:namespace:#{namespace_id}" - lease = Gitlab::ExclusiveLease.new(lease_key, timeout: LEASE_TIMEOUT.to_i) - uuid = lease.try_obtain - return unless uuid - - begin + in_lock(lease_key, ttl: LEASE_TIMEOUT, retries: 0) do yield - ensure - Gitlab::ExclusiveLease.cancel(lease_key, uuid) end + rescue FailedToObtainLockError + # Skip processing if lock unavailable - idempotent, will be handled eventually + nil end def remove_old_namespace_scan_profile_associations(project) diff --git a/ee/spec/services/security/scan_profiles/attach_service_spec.rb b/ee/spec/services/security/scan_profiles/attach_service_spec.rb index ca4890e8f19e06..a006dad682766e 100644 --- a/ee/spec/services/security/scan_profiles/attach_service_spec.rb +++ b/ee/spec/services/security/scan_profiles/attach_service_spec.rb @@ -39,26 +39,13 @@ expect(result[:status]).to eq(:success) end - it 'acquires exclusive lease for each namespace before processing' do + it 'uses in_lock for each namespace' do namespace_ids = [root_group.id, subgroup.id, nested_subgroup.id] namespace_ids.each do |namespace_id| lease_key = "attach_scan_profile:namespace:#{namespace_id}" - expect(Gitlab::ExclusiveLease).to receive(:new) - .with(lease_key, timeout: described_class::LEASE_TIMEOUT.to_i) - .and_call_original - end - - service.execute - end - - it 'releases the lease after processing each namespace' do - namespace_ids = [root_group.id, subgroup.id, nested_subgroup.id] - - namespace_ids.each do |namespace_id| - lease_key = "attach_scan_profile:namespace:#{namespace_id}" - expect(Gitlab::ExclusiveLease).to receive(:cancel) - .with(lease_key, anything) + expect(service).to receive(:in_lock) + .with(lease_key, ttl: described_class::LEASE_TIMEOUT, retries: 0) .and_call_original end @@ -196,15 +183,16 @@ context 'when lock is obtained for some namespaces but not others' do before do - # Lock only the middle namespace (subgroup) + # Lock only the middle namespace (subgroup) and nested_subgroup stub_exclusive_lease_taken("attach_scan_profile:namespace:#{subgroup.id}") + stub_exclusive_lease_taken("attach_scan_profile:namespace:#{nested_subgroup.id}") end it 'processes unlocked namespaces' do allow(Security::ScanProfiles::AttachWorker).to receive(:perform_in) expect { service.execute } - .to change { Security::ScanProfileProject.count }.by(2) # root + nested only + .to change { Security::ScanProfileProject.count }.by(1) # root only # Verify which projects were processed expect(Security::ScanProfileProject.exists?(project: project1, security_scan_profile_id: scan_profile.id)) @@ -212,13 +200,13 @@ expect(Security::ScanProfileProject.exists?(project: project2, security_scan_profile_id: scan_profile.id)) .to be(false) # subgroup locked, not processed expect(Security::ScanProfileProject.exists?(project: project3, security_scan_profile_id: scan_profile.id)) - .to be(true) # nested_subgroup processed + .to be(false) # nested_subgroup locked, not processed end - it 'reschedules worker for locked namespace' do + it 'reschedules worker for locked namespaces' do expect(Security::ScanProfiles::AttachWorker).to receive(:perform_in) .with(described_class::RETRY_DELAY, group.id, scan_profile.id) - .once + .twice service.execute end @@ -237,16 +225,6 @@ allow(Project).to receive(:in_namespace).and_raise(StandardError, 'Query error') end - it 'releases the lock even when error occurs' do - lease_key = "attach_scan_profile:namespace:#{root_group.id}" - - expect(Gitlab::ExclusiveLease).to receive(:cancel) - .with(lease_key, anything) - .and_call_original - - service.execute - end - it 'tracks the exception and returns error response' do expect(Gitlab::ErrorTracking).to receive(:track_exception) .with( diff --git a/ee/spec/workers/security/scan_profiles/process_project_transfer_events_worker_spec.rb b/ee/spec/workers/security/scan_profiles/process_project_transfer_events_worker_spec.rb index 1ba1f637bbf656..0d2129089a1bf0 100644 --- a/ee/spec/workers/security/scan_profiles/process_project_transfer_events_worker_spec.rb +++ b/ee/spec/workers/security/scan_profiles/process_project_transfer_events_worker_spec.rb @@ -41,23 +41,12 @@ subject(:handle_event) { worker.handle_event(project_event) } describe '#handle_event' do - it 'acquires exclusive lease before processing' do + it 'uses in_lock for processing' do project.update!(namespace: new_namespace) lease_key = "attach_scan_profile:namespace:#{old_namespace.id}" - expect(Gitlab::ExclusiveLease).to receive(:new) - .with(lease_key, timeout: described_class::LEASE_TIMEOUT.to_i) - .and_call_original - - handle_event - end - - it 'releases the lease after processing' do - project.update!(namespace: new_namespace) - - lease_key = "attach_scan_profile:namespace:#{old_namespace.id}" - expect(Gitlab::ExclusiveLease).to receive(:cancel) - .with(lease_key, anything) + expect(worker).to receive(:in_lock) + .with(lease_key, ttl: described_class::LEASE_TIMEOUT, retries: 0) .and_call_original handle_event @@ -75,27 +64,6 @@ end end - context 'when error occurs during deletion after lock is obtained' do - before do - project.update!(namespace: new_namespace) - - # Simulate error during deletion - allow_next_found_instance_of(Project) do |instance| - allow(instance).to receive(:security_scan_profiles_projects).and_raise(StandardError, 'Database error') - end - end - - it 'releases the lock even when error occurs' do - lease_key = "attach_scan_profile:namespace:#{old_namespace.id}" - - expect(Gitlab::ExclusiveLease).to receive(:cancel) - .with(lease_key, anything) - .and_call_original - - expect { handle_event }.to raise_error(StandardError, 'Database error') - end - end - context 'when there is no project associated with the event' do let(:project_id) { non_existing_record_id } -- GitLab From a818b767587c6f57e10479c917131d94c9c58a13 Mon Sep 17 00:00:00 2001 From: Nicolae Rotaru Date: Mon, 15 Dec 2025 10:57:40 +0100 Subject: [PATCH 08/10] Address the feedback --- .../security/scan_profiles/attach_service.rb | 55 ++++---- .../security/scan_profiles/attach_worker.rb | 4 +- .../process_project_transfer_events_worker.rb | 12 +- .../scan_profiles/attach_service_spec.rb | 125 +++++++++--------- ...ess_project_transfer_events_worker_spec.rb | 4 +- 5 files changed, 101 insertions(+), 99 deletions(-) diff --git a/ee/app/services/security/scan_profiles/attach_service.rb b/ee/app/services/security/scan_profiles/attach_service.rb index d526aef6498953..a7676e226e1d64 100644 --- a/ee/app/services/security/scan_profiles/attach_service.rb +++ b/ee/app/services/security/scan_profiles/attach_service.rb @@ -10,14 +10,13 @@ class AttachService LEASE_TIMEOUT = 5.minutes RETRY_DELAY = 30.seconds - def initialize(group, scan_profile) + def initialize(group, scan_profile, traverse_hierarchy: true) @group = group @scan_profile = scan_profile + @traverse_hierarchy = traverse_hierarchy end def execute - return error('Group not found') unless group - return error('Scan profile not found') unless scan_profile return error('Scan profile does not belong to group hierarchy') unless valid_namespace? attach_profile_to_projects @@ -32,54 +31,60 @@ def execute private - attr_reader :group, :scan_profile + attr_reader :group, :scan_profile, :traverse_hierarchy def valid_namespace? scan_profile.namespace_id == group.root_ancestor.id end def attach_profile_to_projects - namespace_ids = fetch_namespace_hierarchy_ids - - namespace_ids.each do |namespace_id| + each_namespace_id do |namespace_id| process_namespace_with_lock(namespace_id) end end - def fetch_namespace_hierarchy_ids - cursor = { current_id: group.id, depth: [group.id] } - iterator = Gitlab::Database::NamespaceEachBatch.new(namespace_class: Group, cursor: cursor) + def each_namespace_id + if traverse_hierarchy + # Traverse entire hierarchy: process group and all descendants + cursor = { current_id: group.id, depth: [group.id] } + iterator = Gitlab::Database::NamespaceEachBatch.new(namespace_class: Group, cursor: cursor) - namespace_ids = [] - iterator.each_batch(of: BATCH_SIZE) { |ids| namespace_ids.concat(ids) } - namespace_ids + iterator.each_batch(of: BATCH_SIZE) do |ids| + ids.each { |id| yield id } + end + else + # Only process the specific group: used when rescheduling to avoid re-traversing + yield group.id + end end def process_namespace_with_lock(namespace_id) - lease_key = "attach_scan_profile:namespace:#{namespace_id}" + lease_key = "update_scan_profile:namespace:#{group.root_ancestor.id}" in_lock(lease_key, ttl: LEASE_TIMEOUT, retries: 0) do - project_ids = fetch_projects_for_namespace(namespace_id) - bulk_insert_profile_projects(project_ids) if project_ids.any? + bulk_insert_profile_projects(project_ids_in_namespace(namespace_id)) end rescue FailedToObtainLockError - Security::ScanProfiles::AttachWorker.perform_in(RETRY_DELAY, group.id, scan_profile.id) + # Reschedule with traverse_hierarchy: false to avoid re-traversing the entire hierarchy + # This prevents redundant database queries and processing of already-processed namespaces + Security::ScanProfiles::AttachWorker.perform_in(RETRY_DELAY, group.id, scan_profile.id, false) end - def fetch_projects_for_namespace(namespace_id) + def project_ids_in_namespace(namespace_id) # Fetch projects directly for this specific namespace only - # This is called while holding a lock on the namespace - Project.in_namespace(namespace_id).pluck_primary_key + # This is called while holding a lock on the root namespace + # + # Sort project IDs to ensure consistent lock ordering and prevent deadlocks + # when multiple workers process the same or overlapping groups concurrently + Project.in_namespace(namespace_id).order_by_primary_key.pluck_primary_key end def bulk_insert_profile_projects(project_ids) - timestamp = Time.current + return if project_ids.blank? - # Sort project IDs to ensure consistent lock ordering and prevent deadlocks - # when multiple workers process the same or overlapping groups concurrently - sorted_project_ids = project_ids.sort + timestamp = Time.current - sorted_project_ids.each_slice(BATCH_SIZE) do |batch_ids| + project_ids.each_slice(BATCH_SIZE) do |batch_ids| attributes = batch_ids.map do |project_id| { project_id: project_id, diff --git a/ee/app/workers/security/scan_profiles/attach_worker.rb b/ee/app/workers/security/scan_profiles/attach_worker.rb index b7d865fa5f9c92..d7034cc33eb3f5 100644 --- a/ee/app/workers/security/scan_profiles/attach_worker.rb +++ b/ee/app/workers/security/scan_profiles/attach_worker.rb @@ -13,14 +13,14 @@ class AttachWorker feature_category :security_asset_inventories - def perform(group_id, scan_profile_id) + def perform(group_id, scan_profile_id, traverse_hierarchy = true) group = Group.find_by_id(group_id) return unless group scan_profile = Security::ScanProfile.find_by_id(scan_profile_id) return unless scan_profile - Security::ScanProfiles::AttachService.new(group, scan_profile).execute + Security::ScanProfiles::AttachService.new(group, scan_profile, traverse_hierarchy: traverse_hierarchy).execute end end end diff --git a/ee/app/workers/security/scan_profiles/process_project_transfer_events_worker.rb b/ee/app/workers/security/scan_profiles/process_project_transfer_events_worker.rb index 18d1ef8a77fefb..079bf518c71c51 100644 --- a/ee/app/workers/security/scan_profiles/process_project_transfer_events_worker.rb +++ b/ee/app/workers/security/scan_profiles/process_project_transfer_events_worker.rb @@ -13,22 +13,22 @@ class ProcessProjectTransferEventsWorker LEASE_TIMEOUT = 5.minutes def handle_event(event) - return unless event.data[:old_root_namespace_id] != event.data[:new_root_namespace_id] + return if event.data[:old_root_namespace_id] == event.data[:new_root_namespace_id] project = Project.find_by_id(event.data[:project_id]) return unless project - # Acquire lock on the old namespace to synchronize with attach operations - old_namespace_id = event.data[:old_namespace_id] - process_with_lock(old_namespace_id) do + # Acquire lock on the old root namespace to synchronize with attach operations + old_root_namespace_id = event.data[:old_root_namespace_id] + process_with_lock(old_root_namespace_id) do remove_old_namespace_scan_profile_associations(project) end end private - def process_with_lock(namespace_id) - lease_key = "attach_scan_profile:namespace:#{namespace_id}" + def process_with_lock(root_namespace_id) + lease_key = "update_scan_profile:namespace:#{root_namespace_id}" in_lock(lease_key, ttl: LEASE_TIMEOUT, retries: 0) do yield diff --git a/ee/spec/services/security/scan_profiles/attach_service_spec.rb b/ee/spec/services/security/scan_profiles/attach_service_spec.rb index a006dad682766e..0689a0b18d4bf4 100644 --- a/ee/spec/services/security/scan_profiles/attach_service_spec.rb +++ b/ee/spec/services/security/scan_profiles/attach_service_spec.rb @@ -40,14 +40,10 @@ end it 'uses in_lock for each namespace' do - namespace_ids = [root_group.id, subgroup.id, nested_subgroup.id] - - namespace_ids.each do |namespace_id| - lease_key = "attach_scan_profile:namespace:#{namespace_id}" - expect(service).to receive(:in_lock) - .with(lease_key, ttl: described_class::LEASE_TIMEOUT, retries: 0) - .and_call_original - end + expect(service).to receive(:in_lock) + .with("update_scan_profile:namespace:#{root_group.id}", ttl: described_class::LEASE_TIMEOUT, retries: 0) + .and_call_original + .at_least(:once) service.execute end @@ -86,38 +82,6 @@ end end - context 'when group is nil' do - let(:group) { nil } - - it 'returns an error response' do - result = service.execute - - expect(result[:status]).to eq(:error) - expect(result[:message]).to eq('Group not found') - end - - it 'does not create any records' do - expect { service.execute } - .not_to change { Security::ScanProfileProject.count } - end - end - - context 'when scan profile is nil' do - let(:scan_profile) { nil } - - it 'returns an error response' do - result = described_class.new(group, nil).execute - - expect(result[:status]).to eq(:error) - expect(result[:message]).to eq('Scan profile not found') - end - - it 'does not create any records' do - expect { described_class.new(group, nil).execute } - .not_to change { Security::ScanProfileProject.count } - end - end - context 'when scan profile belongs to a different root namespace' do let_it_be(:other_root_group) { create(:group) } let_it_be(:other_scan_profile) do @@ -157,16 +121,13 @@ context 'when exclusive lease cannot be obtained' do before do - # Stub all namespaces in the hierarchy to simulate all locks being taken - [root_group.id, subgroup.id, nested_subgroup.id].each do |namespace_id| - lease_key = "attach_scan_profile:namespace:#{namespace_id}" - stub_exclusive_lease_taken(lease_key) - end + # Stub the root namespace lock as taken + stub_exclusive_lease_taken("update_scan_profile:namespace:#{root_group.id}") end it 'reschedules the worker to retry later' do expect(Security::ScanProfiles::AttachWorker).to receive(:perform_in) - .with(described_class::RETRY_DELAY, group.id, scan_profile.id) + .with(described_class::RETRY_DELAY, group.id, scan_profile.id, false) .at_least(:once) service.execute @@ -182,38 +143,28 @@ end context 'when lock is obtained for some namespaces but not others' do - before do - # Lock only the middle namespace (subgroup) and nested_subgroup - stub_exclusive_lease_taken("attach_scan_profile:namespace:#{subgroup.id}") - stub_exclusive_lease_taken("attach_scan_profile:namespace:#{nested_subgroup.id}") - end - - it 'processes unlocked namespaces' do + it 'processes all namespaces when lock is obtained' do allow(Security::ScanProfiles::AttachWorker).to receive(:perform_in) expect { service.execute } - .to change { Security::ScanProfileProject.count }.by(1) # root only + .to change { Security::ScanProfileProject.count }.by(3) # all projects - # Verify which projects were processed + # Verify all projects were processed expect(Security::ScanProfileProject.exists?(project: project1, security_scan_profile_id: scan_profile.id)) - .to be(true) # root_group processed + .to be(true) expect(Security::ScanProfileProject.exists?(project: project2, security_scan_profile_id: scan_profile.id)) - .to be(false) # subgroup locked, not processed + .to be(true) expect(Security::ScanProfileProject.exists?(project: project3, security_scan_profile_id: scan_profile.id)) - .to be(false) # nested_subgroup locked, not processed + .to be(true) end - it 'reschedules worker for locked namespaces' do - expect(Security::ScanProfiles::AttachWorker).to receive(:perform_in) - .with(described_class::RETRY_DELAY, group.id, scan_profile.id) - .twice + it 'does not reschedule when lock is obtained' do + expect(Security::ScanProfiles::AttachWorker).not_to receive(:perform_in) service.execute end it 'still returns success' do - allow(Security::ScanProfiles::AttachWorker).to receive(:perform_in) - result = service.execute expect(result[:status]).to eq(:success) end @@ -285,5 +236,51 @@ service.execute end end + + context 'when traverse_hierarchy is false' do + let(:service) { described_class.new(group, scan_profile, traverse_hierarchy: false) } + + it 'only processes the specific group, not descendants' do + expect { service.execute } + .to change { Security::ScanProfileProject.count }.by(1) # only project1 + + expect(Security::ScanProfileProject.exists?(project: project1, security_scan_profile_id: scan_profile.id)) + .to be(true) + expect(Security::ScanProfileProject.exists?(project: project2, security_scan_profile_id: scan_profile.id)) + .to be(false) + expect(Security::ScanProfileProject.exists?(project: project3, security_scan_profile_id: scan_profile.id)) + .to be(false) + end + + context 'when called for a subgroup' do + let(:group) { subgroup } + + it 'only processes the subgroup, not its descendants' do + expect { service.execute } + .to change { Security::ScanProfileProject.count }.by(1) # only project2 + + expect(Security::ScanProfileProject.exists?(project: project1, security_scan_profile_id: scan_profile.id)) + .to be(false) + expect(Security::ScanProfileProject.exists?(project: project2, security_scan_profile_id: scan_profile.id)) + .to be(true) + expect(Security::ScanProfileProject.exists?(project: project3, security_scan_profile_id: scan_profile.id)) + .to be(false) + end + end + end + + context 'when reschedule happens' do + before do + stub_exclusive_lease_taken("update_scan_profile:namespace:#{root_group.id}") + end + + it 'reschedules with traverse_hierarchy: false' do + expect(Security::ScanProfiles::AttachWorker).to receive(:perform_in) + .with(described_class::RETRY_DELAY, group.id, scan_profile.id, false) + .at_least(:once) + + service.execute + end + end end end diff --git a/ee/spec/workers/security/scan_profiles/process_project_transfer_events_worker_spec.rb b/ee/spec/workers/security/scan_profiles/process_project_transfer_events_worker_spec.rb index 0d2129089a1bf0..a2158f1e05e3ec 100644 --- a/ee/spec/workers/security/scan_profiles/process_project_transfer_events_worker_spec.rb +++ b/ee/spec/workers/security/scan_profiles/process_project_transfer_events_worker_spec.rb @@ -44,7 +44,7 @@ it 'uses in_lock for processing' do project.update!(namespace: new_namespace) - lease_key = "attach_scan_profile:namespace:#{old_namespace.id}" + lease_key = "update_scan_profile:namespace:#{old_namespace.id}" expect(worker).to receive(:in_lock) .with(lease_key, ttl: described_class::LEASE_TIMEOUT, retries: 0) .and_call_original @@ -55,7 +55,7 @@ context 'when exclusive lease cannot be obtained' do before do project.update!(namespace: new_namespace) - lease_key = "attach_scan_profile:namespace:#{old_namespace.id}" + lease_key = "update_scan_profile:namespace:#{old_namespace.id}" stub_exclusive_lease_taken(lease_key) end -- GitLab From 379024962d9ba5953f64f5ffa16f0427b29a161e Mon Sep 17 00:00:00 2001 From: Nicolae Rotaru Date: Mon, 15 Dec 2025 14:28:44 +0100 Subject: [PATCH 09/10] Implement batching to prevent timeouts --- .../security/scan_profiles/attach_service.rb | 71 +++++++++---------- 1 file changed, 32 insertions(+), 39 deletions(-) diff --git a/ee/app/services/security/scan_profiles/attach_service.rb b/ee/app/services/security/scan_profiles/attach_service.rb index a7676e226e1d64..69b2196546bb10 100644 --- a/ee/app/services/security/scan_profiles/attach_service.rb +++ b/ee/app/services/security/scan_profiles/attach_service.rb @@ -38,23 +38,17 @@ def valid_namespace? end def attach_profile_to_projects - each_namespace_id do |namespace_id| - process_namespace_with_lock(namespace_id) - end - end - - def each_namespace_id if traverse_hierarchy - # Traverse entire hierarchy: process group and all descendants - cursor = { current_id: group.id, depth: [group.id] } - iterator = Gitlab::Database::NamespaceEachBatch.new(namespace_class: Group, cursor: cursor) + iterator = Gitlab::Database::NamespaceProjectIdsEachBatch.new( + group_id: group.id, + batch_size: BATCH_SIZE + ) - iterator.each_batch(of: BATCH_SIZE) do |ids| - ids.each { |id| yield id } + iterator.subgroup_ids.each do |namespace_id| + process_namespace_with_lock(namespace_id) end else - # Only process the specific group: used when rescheduling to avoid re-traversing - yield group.id + process_namespace_with_lock(group.id) end end @@ -62,43 +56,42 @@ def process_namespace_with_lock(namespace_id) lease_key = "update_scan_profile:namespace:#{group.root_ancestor.id}" in_lock(lease_key, ttl: LEASE_TIMEOUT, retries: 0) do - bulk_insert_profile_projects(project_ids_in_namespace(namespace_id)) + each_project_id_in_namespace(namespace_id) do |batch_ids| + bulk_insert_profile_projects(batch_ids) + end end rescue FailedToObtainLockError - # Reschedule with traverse_hierarchy: false to avoid re-traversing the entire hierarchy - # This prevents redundant database queries and processing of already-processed namespaces Security::ScanProfiles::AttachWorker.perform_in(RETRY_DELAY, group.id, scan_profile.id, false) end - def project_ids_in_namespace(namespace_id) - # Fetch projects directly for this specific namespace only - # This is called while holding a lock on the root namespace - # - # Sort project IDs to ensure consistent lock ordering and prevent deadlocks - # when multiple workers process the same or overlapping groups concurrently - Project.in_namespace(namespace_id).order_by_primary_key.pluck_primary_key + def each_project_id_in_namespace(namespace_id) + relation = Project.in_namespace(namespace_id).order_by_primary_key + + relation.each_batch(of: BATCH_SIZE) do |batch| + # Sort project IDs to ensure consistent lock ordering and prevent deadlocks + # when multiple workers process the same or overlapping groups concurrently + yield batch.pluck_primary_key.sort + end end - def bulk_insert_profile_projects(project_ids) - return if project_ids.blank? + def bulk_insert_profile_projects(batch_ids) + return if batch_ids.blank? timestamp = Time.current - project_ids.each_slice(BATCH_SIZE) do |batch_ids| - attributes = batch_ids.map do |project_id| - { - project_id: project_id, - security_scan_profile_id: scan_profile.id, - created_at: timestamp, - updated_at: timestamp - } - end - - Security::ScanProfileProject.insert_all( - attributes, - unique_by: [:project_id, :security_scan_profile_id] - ) + attributes = batch_ids.map do |project_id| + { + project_id: project_id, + security_scan_profile_id: scan_profile.id, + created_at: timestamp, + updated_at: timestamp + } end + + Security::ScanProfileProject.insert_all( + attributes, + unique_by: [:project_id, :security_scan_profile_id] + ) end end end -- GitLab From 00acca51676a848c95689ce5d7e92c21ebb980b9 Mon Sep 17 00:00:00 2001 From: Nicolae Rotaru Date: Tue, 16 Dec 2025 16:04:25 +0100 Subject: [PATCH 10/10] Refactor the locking --- .../security/scan_profiles/attach_service.rb | 28 ++--- .../process_project_transfer_events_worker.rb | 10 +- .../scan_profiles/attach_service_spec.rb | 106 +++++++++++------- .../scan_profiles/attach_worker_spec.rb | 7 +- 4 files changed, 90 insertions(+), 61 deletions(-) diff --git a/ee/app/services/security/scan_profiles/attach_service.rb b/ee/app/services/security/scan_profiles/attach_service.rb index 69b2196546bb10..d9a4468460d451 100644 --- a/ee/app/services/security/scan_profiles/attach_service.rb +++ b/ee/app/services/security/scan_profiles/attach_service.rb @@ -53,25 +53,27 @@ def attach_profile_to_projects end def process_namespace_with_lock(namespace_id) - lease_key = "update_scan_profile:namespace:#{group.root_ancestor.id}" + # Process projects in batches, acquiring lock per batch + # The lock must cover both the project ID query AND the insert + # to prevent race conditions with project transfers + relation = Project.in_namespace(namespace_id).order_by_primary_key - in_lock(lease_key, ttl: LEASE_TIMEOUT, retries: 0) do - each_project_id_in_namespace(namespace_id) do |batch_ids| - bulk_insert_profile_projects(batch_ids) - end + relation.each_batch(of: BATCH_SIZE) do |batch| + insert_batch_with_lock(namespace_id, batch) end - rescue FailedToObtainLockError - Security::ScanProfiles::AttachWorker.perform_in(RETRY_DELAY, group.id, scan_profile.id, false) end - def each_project_id_in_namespace(namespace_id) - relation = Project.in_namespace(namespace_id).order_by_primary_key + def insert_batch_with_lock(namespace_id, batch) + lease_key = "update_scan_profile:namespace:#{namespace_id}" - relation.each_batch(of: BATCH_SIZE) do |batch| - # Sort project IDs to ensure consistent lock ordering and prevent deadlocks - # when multiple workers process the same or overlapping groups concurrently - yield batch.pluck_primary_key.sort + in_lock(lease_key, ttl: LEASE_TIMEOUT, retries: 0) do + # Query project IDs within the lock to prevent race conditions + # where a project is moved between querying and inserting + batch_ids = batch.pluck_primary_key.sort + bulk_insert_profile_projects(batch_ids) end + rescue FailedToObtainLockError + Security::ScanProfiles::AttachWorker.perform_in(RETRY_DELAY, group.id, scan_profile.id, false) end def bulk_insert_profile_projects(batch_ids) diff --git a/ee/app/workers/security/scan_profiles/process_project_transfer_events_worker.rb b/ee/app/workers/security/scan_profiles/process_project_transfer_events_worker.rb index 079bf518c71c51..ab8d8cca9b4d71 100644 --- a/ee/app/workers/security/scan_profiles/process_project_transfer_events_worker.rb +++ b/ee/app/workers/security/scan_profiles/process_project_transfer_events_worker.rb @@ -18,17 +18,17 @@ def handle_event(event) project = Project.find_by_id(event.data[:project_id]) return unless project - # Acquire lock on the old root namespace to synchronize with attach operations - old_root_namespace_id = event.data[:old_root_namespace_id] - process_with_lock(old_root_namespace_id) do + # Acquire lock on the old namespace to synchronize with attach operations + old_namespace_id = event.data[:old_namespace_id] + process_with_lock(old_namespace_id) do remove_old_namespace_scan_profile_associations(project) end end private - def process_with_lock(root_namespace_id) - lease_key = "update_scan_profile:namespace:#{root_namespace_id}" + def process_with_lock(namespace_id) + lease_key = "update_scan_profile:namespace:#{namespace_id}" in_lock(lease_key, ttl: LEASE_TIMEOUT, retries: 0) do yield diff --git a/ee/spec/services/security/scan_profiles/attach_service_spec.rb b/ee/spec/services/security/scan_profiles/attach_service_spec.rb index 0689a0b18d4bf4..b024886b549400 100644 --- a/ee/spec/services/security/scan_profiles/attach_service_spec.rb +++ b/ee/spec/services/security/scan_profiles/attach_service_spec.rb @@ -25,12 +25,13 @@ expect { service.execute } .to change { Security::ScanProfileProject.count }.by(3) - expect(Security::ScanProfileProject.exists?(project: project1, security_scan_profile_id: scan_profile.id)) - .to be(true) - expect(Security::ScanProfileProject.exists?(project: project2, security_scan_profile_id: scan_profile.id)) - .to be(true) - expect(Security::ScanProfileProject.exists?(project: project3, security_scan_profile_id: scan_profile.id)) - .to be(true) + association1 = Security::ScanProfileProject.find_by(project: project1, security_scan_profile_id: scan_profile.id) + association2 = Security::ScanProfileProject.find_by(project: project2, security_scan_profile_id: scan_profile.id) + association3 = Security::ScanProfileProject.find_by(project: project3, security_scan_profile_id: scan_profile.id) + + expect(association1).to be_persisted + expect(association2).to be_persisted + expect(association3).to be_persisted end it 'returns a success response' do @@ -39,9 +40,10 @@ expect(result[:status]).to eq(:success) end - it 'uses in_lock for each namespace' do + it 'uses in_lock for each namespace batch' do + # With per-namespace locking, we expect locks on each namespace being processed expect(service).to receive(:in_lock) - .with("update_scan_profile:namespace:#{root_group.id}", ttl: described_class::LEASE_TIMEOUT, retries: 0) + .with(a_string_matching(/update_scan_profile:namespace:\d+/), ttl: described_class::LEASE_TIMEOUT, retries: 0) .and_call_original .at_least(:once) @@ -55,12 +57,16 @@ expect { service.execute } .to change { Security::ScanProfileProject.count }.by(2) - expect(Security::ScanProfileProject.exists?(project: project1, security_scan_profile_id: scan_profile.id)) - .to be(false) - expect(Security::ScanProfileProject.exists?(project: project2, security_scan_profile_id: scan_profile.id)) - .to be(true) - expect(Security::ScanProfileProject.exists?(project: project3, security_scan_profile_id: scan_profile.id)) - .to be(true) + association1 = Security::ScanProfileProject.find_by(project: project1, + security_scan_profile_id: scan_profile.id) + association2 = Security::ScanProfileProject.find_by(project: project2, + security_scan_profile_id: scan_profile.id) + association3 = Security::ScanProfileProject.find_by(project: project3, + security_scan_profile_id: scan_profile.id) + + expect(association1).to be_nil + expect(association2).to be_persisted + expect(association3).to be_persisted end end @@ -121,8 +127,10 @@ context 'when exclusive lease cannot be obtained' do before do - # Stub the root namespace lock as taken - stub_exclusive_lease_taken("update_scan_profile:namespace:#{root_group.id}") + # Stub all namespace locks as taken (root_group, subgroup, nested_subgroup) + [root_group.id, subgroup.id, nested_subgroup.id].each do |namespace_id| + stub_exclusive_lease_taken("update_scan_profile:namespace:#{namespace_id}") + end end it 'reschedules the worker to retry later' do @@ -147,15 +155,18 @@ allow(Security::ScanProfiles::AttachWorker).to receive(:perform_in) expect { service.execute } - .to change { Security::ScanProfileProject.count }.by(3) # all projects - - # Verify all projects were processed - expect(Security::ScanProfileProject.exists?(project: project1, security_scan_profile_id: scan_profile.id)) - .to be(true) - expect(Security::ScanProfileProject.exists?(project: project2, security_scan_profile_id: scan_profile.id)) - .to be(true) - expect(Security::ScanProfileProject.exists?(project: project3, security_scan_profile_id: scan_profile.id)) - .to be(true) + .to change { Security::ScanProfileProject.count }.by(3) + + association1 = Security::ScanProfileProject.find_by(project: project1, + security_scan_profile_id: scan_profile.id) + association2 = Security::ScanProfileProject.find_by(project: project2, + security_scan_profile_id: scan_profile.id) + association3 = Security::ScanProfileProject.find_by(project: project3, + security_scan_profile_id: scan_profile.id) + + expect(association1).to be_persisted + expect(association2).to be_persisted + expect(association3).to be_persisted end it 'does not reschedule when lock is obtained' do @@ -242,14 +253,18 @@ it 'only processes the specific group, not descendants' do expect { service.execute } - .to change { Security::ScanProfileProject.count }.by(1) # only project1 - - expect(Security::ScanProfileProject.exists?(project: project1, security_scan_profile_id: scan_profile.id)) - .to be(true) - expect(Security::ScanProfileProject.exists?(project: project2, security_scan_profile_id: scan_profile.id)) - .to be(false) - expect(Security::ScanProfileProject.exists?(project: project3, security_scan_profile_id: scan_profile.id)) - .to be(false) + .to change { Security::ScanProfileProject.count }.by(1) + + association1 = Security::ScanProfileProject.find_by(project: project1, + security_scan_profile_id: scan_profile.id) + association2 = Security::ScanProfileProject.find_by(project: project2, + security_scan_profile_id: scan_profile.id) + association3 = Security::ScanProfileProject.find_by(project: project3, + security_scan_profile_id: scan_profile.id) + + expect(association1).to be_persisted + expect(association2).to be_nil + expect(association3).to be_nil end context 'when called for a subgroup' do @@ -257,21 +272,28 @@ it 'only processes the subgroup, not its descendants' do expect { service.execute } - .to change { Security::ScanProfileProject.count }.by(1) # only project2 - - expect(Security::ScanProfileProject.exists?(project: project1, security_scan_profile_id: scan_profile.id)) - .to be(false) - expect(Security::ScanProfileProject.exists?(project: project2, security_scan_profile_id: scan_profile.id)) - .to be(true) - expect(Security::ScanProfileProject.exists?(project: project3, security_scan_profile_id: scan_profile.id)) - .to be(false) + .to change { Security::ScanProfileProject.count }.by(1) + + association1 = Security::ScanProfileProject.find_by(project: project1, + security_scan_profile_id: scan_profile.id) + association2 = Security::ScanProfileProject.find_by(project: project2, + security_scan_profile_id: scan_profile.id) + association3 = Security::ScanProfileProject.find_by(project: project3, + security_scan_profile_id: scan_profile.id) + + expect(association1).to be_nil + expect(association2).to be_persisted + expect(association3).to be_nil end end end context 'when reschedule happens' do before do - stub_exclusive_lease_taken("update_scan_profile:namespace:#{root_group.id}") + # Stub all namespace locks as taken + [root_group.id, subgroup.id, nested_subgroup.id].each do |namespace_id| + stub_exclusive_lease_taken("update_scan_profile:namespace:#{namespace_id}") + end end it 'reschedules with traverse_hierarchy: false' do diff --git a/ee/spec/workers/security/scan_profiles/attach_worker_spec.rb b/ee/spec/workers/security/scan_profiles/attach_worker_spec.rb index 6480ba8594a9e0..ad373b87307038 100644 --- a/ee/spec/workers/security/scan_profiles/attach_worker_spec.rb +++ b/ee/spec/workers/security/scan_profiles/attach_worker_spec.rb @@ -23,7 +23,12 @@ describe '#perform' do it 'delegates to the AttachService' do - expect_next_instance_of(Security::ScanProfiles::AttachService, group, scan_profile) do |service| + expect_next_instance_of( + Security::ScanProfiles::AttachService, + group, + scan_profile, + traverse_hierarchy: true + ) do |service| expect(service).to receive(:execute).and_call_original end -- GitLab