diff --git a/config/sidekiq_queues.yml b/config/sidekiq_queues.yml index 0ddc86629373dc0a91970a39338bf6d47b0ed50a..e59f4e98d923b2c480114e53304924b3b1cca641 100644 --- a/config/sidekiq_queues.yml +++ b/config/sidekiq_queues.yml @@ -1065,6 +1065,8 @@ - 1 - - security_scan_execution_policies_rule_schedule - 1 +- - security_scan_profiles_attach + - 1 - - security_scan_profiles_process_project_transfer_events - 1 - - security_scan_result_policies_add_approvers_to_rules diff --git a/ee/app/services/security/scan_profiles/attach_service.rb b/ee/app/services/security/scan_profiles/attach_service.rb new file mode 100644 index 0000000000000000000000000000000000000000..d9a4468460d451ae503b2769fa50399f54c04be4 --- /dev/null +++ b/ee/app/services/security/scan_profiles/attach_service.rb @@ -0,0 +1,100 @@ +# frozen_string_literal: true + +module Security + module ScanProfiles + class AttachService + include BaseServiceUtility + include Gitlab::ExclusiveLeaseHelpers + + BATCH_SIZE = 500 + LEASE_TIMEOUT = 5.minutes + RETRY_DELAY = 30.seconds + + def initialize(group, scan_profile, traverse_hierarchy: true) + @group = group + @scan_profile = scan_profile + @traverse_hierarchy = traverse_hierarchy + end + + def execute + return error('Scan profile does not belong to group hierarchy') unless valid_namespace? + + attach_profile_to_projects + success + rescue StandardError => e + Gitlab::ErrorTracking.track_exception(e, { + group_id: group.id, + scan_profile_id: scan_profile.id + }) + error('Failed to attach scan profile to projects') + end + + private + + attr_reader :group, :scan_profile, :traverse_hierarchy + + def valid_namespace? + scan_profile.namespace_id == group.root_ancestor.id + end + + def attach_profile_to_projects + if traverse_hierarchy + iterator = Gitlab::Database::NamespaceProjectIdsEachBatch.new( + group_id: group.id, + batch_size: BATCH_SIZE + ) + + iterator.subgroup_ids.each do |namespace_id| + process_namespace_with_lock(namespace_id) + end + else + process_namespace_with_lock(group.id) + end + end + + def process_namespace_with_lock(namespace_id) + # Process projects in batches, acquiring lock per batch + # The lock must cover both the project ID query AND the insert + # to prevent race conditions with project transfers + relation = Project.in_namespace(namespace_id).order_by_primary_key + + relation.each_batch(of: BATCH_SIZE) do |batch| + insert_batch_with_lock(namespace_id, batch) + end + end + + def insert_batch_with_lock(namespace_id, batch) + lease_key = "update_scan_profile:namespace:#{namespace_id}" + + in_lock(lease_key, ttl: LEASE_TIMEOUT, retries: 0) do + # Query project IDs within the lock to prevent race conditions + # where a project is moved between querying and inserting + batch_ids = batch.pluck_primary_key.sort + bulk_insert_profile_projects(batch_ids) + end + rescue FailedToObtainLockError + Security::ScanProfiles::AttachWorker.perform_in(RETRY_DELAY, group.id, scan_profile.id, false) + end + + def bulk_insert_profile_projects(batch_ids) + return if batch_ids.blank? + + timestamp = Time.current + + attributes = batch_ids.map do |project_id| + { + project_id: project_id, + security_scan_profile_id: scan_profile.id, + created_at: timestamp, + updated_at: timestamp + } + end + + Security::ScanProfileProject.insert_all( + attributes, + unique_by: [:project_id, :security_scan_profile_id] + ) + end + end + end +end diff --git a/ee/app/workers/all_queues.yml b/ee/app/workers/all_queues.yml index 73f03bdec9f9987974697cae0754bf1f7694eab5..cd6936def1b796cbc41f4f226b523c5f9a9a82d6 100644 --- a/ee/app/workers/all_queues.yml +++ b/ee/app/workers/all_queues.yml @@ -4166,6 +4166,16 @@ :idempotent: false :tags: [] :queue_namespace: +- :name: security_scan_profiles_attach + :worker_name: Security::ScanProfiles::AttachWorker + :feature_category: :security_asset_inventories + :has_external_dependencies: false + :urgency: :low + :resource_boundary: :unknown + :weight: 1 + :idempotent: true + :tags: [] + :queue_namespace: - :name: security_scan_profiles_process_project_transfer_events :worker_name: Security::ScanProfiles::ProcessProjectTransferEventsWorker :feature_category: :security_asset_inventories diff --git a/ee/app/workers/security/scan_profiles/attach_worker.rb b/ee/app/workers/security/scan_profiles/attach_worker.rb new file mode 100644 index 0000000000000000000000000000000000000000..d7034cc33eb3f5df8951c3a5feb5fcb7e9db0e28 --- /dev/null +++ b/ee/app/workers/security/scan_profiles/attach_worker.rb @@ -0,0 +1,27 @@ +# frozen_string_literal: true + +module Security + module ScanProfiles + class AttachWorker + include ApplicationWorker + + data_consistency :sticky + idempotent! + deduplicate :until_executed + + concurrency_limit -> { 200 } + + feature_category :security_asset_inventories + + def perform(group_id, scan_profile_id, traverse_hierarchy = true) + group = Group.find_by_id(group_id) + return unless group + + scan_profile = Security::ScanProfile.find_by_id(scan_profile_id) + return unless scan_profile + + Security::ScanProfiles::AttachService.new(group, scan_profile, traverse_hierarchy: traverse_hierarchy).execute + end + end + end +end diff --git a/ee/app/workers/security/scan_profiles/process_project_transfer_events_worker.rb b/ee/app/workers/security/scan_profiles/process_project_transfer_events_worker.rb index 04bf12caf18283f8da9ae7d2056d38ded3b3bf4d..ab8d8cca9b4d71fa4c17509fa37de26e974c3d72 100644 --- a/ee/app/workers/security/scan_profiles/process_project_transfer_events_worker.rb +++ b/ee/app/workers/security/scan_profiles/process_project_transfer_events_worker.rb @@ -4,22 +4,40 @@ module Security module ScanProfiles class ProcessProjectTransferEventsWorker include Gitlab::EventStore::Subscriber + include Gitlab::ExclusiveLeaseHelpers idempotent! data_consistency :sticky feature_category :security_asset_inventories + LEASE_TIMEOUT = 5.minutes + def handle_event(event) - return unless event.data[:old_root_namespace_id] != event.data[:new_root_namespace_id] + return if event.data[:old_root_namespace_id] == event.data[:new_root_namespace_id] project = Project.find_by_id(event.data[:project_id]) return unless project - remove_old_namespace_scan_profile_associations(project) + # Acquire lock on the old namespace to synchronize with attach operations + old_namespace_id = event.data[:old_namespace_id] + process_with_lock(old_namespace_id) do + remove_old_namespace_scan_profile_associations(project) + end end private + def process_with_lock(namespace_id) + lease_key = "update_scan_profile:namespace:#{namespace_id}" + + in_lock(lease_key, ttl: LEASE_TIMEOUT, retries: 0) do + yield + end + rescue FailedToObtainLockError + # Skip processing if lock unavailable - idempotent, will be handled eventually + nil + end + def remove_old_namespace_scan_profile_associations(project) project.security_scan_profiles_projects.not_in_root_namespace(project.group&.root_ancestor).delete_all end diff --git a/ee/spec/services/security/scan_profiles/attach_service_spec.rb b/ee/spec/services/security/scan_profiles/attach_service_spec.rb new file mode 100644 index 0000000000000000000000000000000000000000..6b356eef11f5a90de01c1b97850b51a253e58f63 --- /dev/null +++ b/ee/spec/services/security/scan_profiles/attach_service_spec.rb @@ -0,0 +1,342 @@ +# frozen_string_literal: true + +require 'spec_helper' + +RSpec.describe Security::ScanProfiles::AttachService, feature_category: :security_asset_inventories do + include ExclusiveLeaseHelpers + + let_it_be(:root_group) { create(:group) } + let_it_be(:subgroup) { create(:group, parent: root_group) } + let_it_be(:nested_subgroup) { create(:group, parent: subgroup) } + + let_it_be(:project1) { create(:project, namespace: root_group) } + let_it_be(:project2) { create(:project, namespace: subgroup) } + let_it_be(:project3) { create(:project, namespace: nested_subgroup) } + + let_it_be(:scan_profile) do + create(:security_scan_profile, namespace: root_group, scan_type: :secret_detection) + end + + let(:group) { root_group } + let(:service) { described_class.new(group, scan_profile) } + + describe '#execute' do + it 'attaches the scan profile to all projects in the group hierarchy' do + expect { service.execute } + .to change { Security::ScanProfileProject.count }.by(3) + + association1 = Security::ScanProfileProject.find_by(project: project1, security_scan_profile_id: scan_profile.id) + association2 = Security::ScanProfileProject.find_by(project: project2, security_scan_profile_id: scan_profile.id) + association3 = Security::ScanProfileProject.find_by(project: project3, security_scan_profile_id: scan_profile.id) + + expect(association1).to be_persisted + expect(association2).to be_persisted + expect(association3).to be_persisted + end + + it 'returns a success response' do + result = service.execute + + expect(result[:status]).to eq(:success) + end + + it 'uses in_lock for each namespace batch' do + # With per-namespace locking, we expect locks on each namespace being processed + expect(service).to receive(:in_lock) + .with(a_string_matching(/update_scan_profile:namespace:\d+/), ttl: described_class::LEASE_TIMEOUT, retries: 0) + .and_call_original + .at_least(:once) + + service.execute + end + + context 'when called for a subgroup' do + let(:group) { subgroup } + + it 'attaches the scan profile to projects in the subgroup and its descendants' do + expect { service.execute } + .to change { Security::ScanProfileProject.count }.by(2) + + association1 = Security::ScanProfileProject.find_by(project: project1, + security_scan_profile_id: scan_profile.id) + association2 = Security::ScanProfileProject.find_by(project: project2, + security_scan_profile_id: scan_profile.id) + association3 = Security::ScanProfileProject.find_by(project: project3, + security_scan_profile_id: scan_profile.id) + + expect(association1).to be_nil + expect(association2).to be_persisted + expect(association3).to be_persisted + end + end + + context 'when the profile is already attached to some projects' do + before do + create(:security_scan_profile_project, project: project1, scan_profile: scan_profile) + end + + it 'is idempotent and does not create duplicate records' do + expect { service.execute } + .to change { Security::ScanProfileProject.count }.by(2) + + # Running again should not create duplicates + expect { service.execute } + .not_to change { Security::ScanProfileProject.count } + + expect(Security::ScanProfileProject.where(project: project1, security_scan_profile_id: scan_profile.id).count) + .to eq(1) + end + end + + context 'when scan profile belongs to a different root namespace' do + let_it_be(:other_root_group) { create(:group) } + let_it_be(:other_scan_profile) do + create(:security_scan_profile, namespace: other_root_group, scan_type: :secret_detection) + end + + let(:scan_profile) { other_scan_profile } + + it 'returns an error response' do + result = service.execute + + expect(result[:status]).to eq(:error) + expect(result[:message]).to eq('Scan profile does not belong to group hierarchy') + end + + it 'does not create any records' do + expect { service.execute } + .not_to change { Security::ScanProfileProject.count } + end + end + + context 'when the group has no projects' do + let_it_be(:empty_group) { create(:group, parent: root_group) } + let(:group) { empty_group } + + it 'returns a success response' do + result = service.execute + + expect(result[:status]).to eq(:success) + end + + it 'does not create any records' do + expect { service.execute } + .not_to change { Security::ScanProfileProject.count } + end + end + + context 'when exclusive lease cannot be obtained' do + before do + # Stub all namespace locks as taken (root_group, subgroup, nested_subgroup) + [root_group.id, subgroup.id, nested_subgroup.id].each do |namespace_id| + stub_exclusive_lease_taken("update_scan_profile:namespace:#{namespace_id}") + end + end + + it 'reschedules the worker to retry later' do + expect(Security::ScanProfiles::AttachWorker).to receive(:perform_in) + .with(described_class::RETRY_DELAY, group.id, scan_profile.id, false) + .at_least(:once) + + service.execute + end + + it 'does not process any namespace' do + allow(Security::ScanProfiles::AttachWorker).to receive(:perform_in) + + expect(Security::ScanProfileProject).not_to receive(:insert_all) + + service.execute + end + end + + context 'when lock is obtained for some namespaces but not others' do + it 'processes all namespaces when lock is obtained' do + allow(Security::ScanProfiles::AttachWorker).to receive(:perform_in) + + expect { service.execute } + .to change { Security::ScanProfileProject.count }.by(3) + + association1 = Security::ScanProfileProject.find_by(project: project1, + security_scan_profile_id: scan_profile.id) + association2 = Security::ScanProfileProject.find_by(project: project2, + security_scan_profile_id: scan_profile.id) + association3 = Security::ScanProfileProject.find_by(project: project3, + security_scan_profile_id: scan_profile.id) + + expect(association1).to be_persisted + expect(association2).to be_persisted + expect(association3).to be_persisted + end + + it 'does not reschedule when lock is obtained' do + expect(Security::ScanProfiles::AttachWorker).not_to receive(:perform_in) + + service.execute + end + + it 'still returns success' do + result = service.execute + expect(result[:status]).to eq(:success) + end + end + + context 'when error occurs after obtaining lock' do + before do + # Simulate error during project fetching after lock is obtained + allow(Project).to receive(:in_namespace).and_raise(StandardError, 'Query error') + end + + it 'tracks the exception and returns error response' do + expect(Gitlab::ErrorTracking).to receive(:track_exception) + .with( + an_instance_of(StandardError), + { + group_id: group.id, + scan_profile_id: scan_profile.id + } + ) + + result = service.execute + expect(result[:status]).to eq(:error) + expect(result[:message]).to eq('Failed to attach scan profile to projects') + end + end + + context 'when an error occurs during attachment' do + before do + allow_next_instance_of(Gitlab::Database::NamespaceEachBatch) do |instance| + allow(instance).to receive(:each_batch).and_raise(StandardError, 'Database error') + end + end + + it 'tracks the exception with context' do + expect(Gitlab::ErrorTracking).to receive(:track_exception) + .with( + an_instance_of(StandardError), + { + group_id: group.id, + scan_profile_id: scan_profile.id + } + ) + + service.execute + end + + it 'returns an error response' do + result = service.execute + + expect(result[:status]).to eq(:error) + expect(result[:message]).to eq('Failed to attach scan profile to projects') + end + + it 'does not create any records' do + expect { service.execute } + .not_to change { Security::ScanProfileProject.count } + end + end + + context 'when project IDs are sorted to prevent deadlocks' do + it 'sorts project IDs before inserting' do + # Expect insert_all to be called with sorted IDs for each namespace + allow(Security::ScanProfileProject).to receive(:insert_all) do |attributes, _options| + project_ids = attributes.pluck(:project_id) + expect(project_ids).to eq(project_ids.sort) + end.at_least(:once) + + service.execute + end + end + + context 'when traverse_hierarchy is false' do + let(:service) { described_class.new(group, scan_profile, traverse_hierarchy: false) } + + it 'only processes the specific group, not descendants' do + expect { service.execute } + .to change { Security::ScanProfileProject.count }.by(1) + + association1 = Security::ScanProfileProject.find_by(project: project1, + security_scan_profile_id: scan_profile.id) + association2 = Security::ScanProfileProject.find_by(project: project2, + security_scan_profile_id: scan_profile.id) + association3 = Security::ScanProfileProject.find_by(project: project3, + security_scan_profile_id: scan_profile.id) + + expect(association1).to be_persisted + expect(association2).to be_nil + expect(association3).to be_nil + end + + context 'when called for a subgroup' do + let(:group) { subgroup } + + it 'only processes the subgroup, not its descendants' do + expect { service.execute } + .to change { Security::ScanProfileProject.count }.by(1) + + association1 = Security::ScanProfileProject.find_by(project: project1, + security_scan_profile_id: scan_profile.id) + association2 = Security::ScanProfileProject.find_by(project: project2, + security_scan_profile_id: scan_profile.id) + association3 = Security::ScanProfileProject.find_by(project: project3, + security_scan_profile_id: scan_profile.id) + + expect(association1).to be_nil + expect(association2).to be_persisted + expect(association3).to be_nil + end + end + end + + context 'when reschedule happens' do + before do + # Stub all namespace locks as taken + [root_group.id, subgroup.id, nested_subgroup.id].each do |namespace_id| + stub_exclusive_lease_taken("update_scan_profile:namespace:#{namespace_id}") + end + end + + it 'reschedules with traverse_hierarchy: false' do + expect(Security::ScanProfiles::AttachWorker).to receive(:perform_in) + .with(described_class::RETRY_DELAY, group.id, scan_profile.id, false) + .at_least(:once) + + service.execute + end + end + end + + describe '#bulk_insert_profile_projects' do + it 'does not insert records when batch_ids is empty' do + expect(Security::ScanProfileProject).not_to receive(:insert_all) + + service.send(:bulk_insert_profile_projects, []) + end + + it 'does not insert records when batch_ids is nil' do + expect(Security::ScanProfileProject).not_to receive(:insert_all) + + service.send(:bulk_insert_profile_projects, nil) + end + + it 'inserts records with correct attributes when batch_ids is provided' do + batch_ids = [project1.id, project2.id] + + expect(Security::ScanProfileProject).to receive(:insert_all).with( + array_including( + hash_including( + project_id: project1.id, + security_scan_profile_id: scan_profile.id + ), + hash_including( + project_id: project2.id, + security_scan_profile_id: scan_profile.id + ) + ), + unique_by: [:project_id, :security_scan_profile_id] + ) + + service.send(:bulk_insert_profile_projects, batch_ids) + end + end +end diff --git a/ee/spec/workers/security/scan_profiles/attach_worker_spec.rb b/ee/spec/workers/security/scan_profiles/attach_worker_spec.rb new file mode 100644 index 0000000000000000000000000000000000000000..ad373b87307038673e452c1b59658b37fc5503c8 --- /dev/null +++ b/ee/spec/workers/security/scan_profiles/attach_worker_spec.rb @@ -0,0 +1,76 @@ +# frozen_string_literal: true + +require 'spec_helper' + +RSpec.describe Security::ScanProfiles::AttachWorker, feature_category: :security_asset_inventories do + let_it_be(:root_group) { create(:group) } + let_it_be(:subgroup) { create(:group, parent: root_group) } + let_it_be(:nested_subgroup) { create(:group, parent: subgroup) } + + let_it_be(:project1) { create(:project, namespace: root_group) } + let_it_be(:project2) { create(:project, namespace: subgroup) } + let_it_be(:project3) { create(:project, namespace: nested_subgroup) } + + let_it_be(:scan_profile) do + create(:security_scan_profile, namespace: root_group, scan_type: :secret_detection) + end + + let(:group) { root_group } + let(:group_id) { group.id } + let(:scan_profile_id) { scan_profile.id } + + subject(:worker) { described_class.new } + + describe '#perform' do + it 'delegates to the AttachService' do + expect_next_instance_of( + Security::ScanProfiles::AttachService, + group, + scan_profile, + traverse_hierarchy: true + ) do |service| + expect(service).to receive(:execute).and_call_original + end + + worker.perform(group_id, scan_profile_id) + end + + it 'attaches the scan profile to all projects in the group hierarchy' do + expect { worker.perform(group_id, scan_profile_id) } + .to change { Security::ScanProfileProject.count }.by(3) + + expect(Security::ScanProfileProject.exists?(project: project1, security_scan_profile_id: scan_profile.id)) + .to be(true) + expect(Security::ScanProfileProject.exists?(project: project2, security_scan_profile_id: scan_profile.id)) + .to be(true) + expect(Security::ScanProfileProject.exists?(project: project3, security_scan_profile_id: scan_profile.id)) + .to be(true) + end + + context 'when group does not exist' do + let(:group_id) { non_existing_record_id } + + it 'exits gracefully without raising an error' do + expect { worker.perform(group_id, scan_profile_id) }.not_to raise_error + end + + it 'does not create any records' do + expect { worker.perform(group_id, scan_profile_id) } + .not_to change { Security::ScanProfileProject.count } + end + end + + context 'when scan profile does not exist' do + let(:scan_profile_id) { non_existing_record_id } + + it 'exits gracefully without raising an error' do + expect { worker.perform(group_id, scan_profile_id) }.not_to raise_error + end + + it 'does not create any records' do + expect { worker.perform(group_id, scan_profile_id) } + .not_to change { Security::ScanProfileProject.count } + end + end + end +end diff --git a/ee/spec/workers/security/scan_profiles/process_project_transfer_events_worker_spec.rb b/ee/spec/workers/security/scan_profiles/process_project_transfer_events_worker_spec.rb index 877b072e955329d3361e70ac7f8ddf4b7f1db458..a2158f1e05e3ec6e47cbb63c4ea486709deab089 100644 --- a/ee/spec/workers/security/scan_profiles/process_project_transfer_events_worker_spec.rb +++ b/ee/spec/workers/security/scan_profiles/process_project_transfer_events_worker_spec.rb @@ -3,6 +3,8 @@ require 'spec_helper' RSpec.describe Security::ScanProfiles::ProcessProjectTransferEventsWorker, feature_category: :security_asset_inventories do + include ExclusiveLeaseHelpers + let_it_be(:old_namespace) { create(:group) } let_it_be(:new_namespace) { create(:group) } @@ -39,6 +41,29 @@ subject(:handle_event) { worker.handle_event(project_event) } describe '#handle_event' do + it 'uses in_lock for processing' do + project.update!(namespace: new_namespace) + + lease_key = "update_scan_profile:namespace:#{old_namespace.id}" + expect(worker).to receive(:in_lock) + .with(lease_key, ttl: described_class::LEASE_TIMEOUT, retries: 0) + .and_call_original + + handle_event + end + + context 'when exclusive lease cannot be obtained' do + before do + project.update!(namespace: new_namespace) + lease_key = "update_scan_profile:namespace:#{old_namespace.id}" + stub_exclusive_lease_taken(lease_key) + end + + it 'does not process the transfer' do + expect { handle_event }.not_to change { Security::ScanProfileProject.count } + end + end + context 'when there is no project associated with the event' do let(:project_id) { non_existing_record_id }