diff --git a/ee/app/workers/security/sync_policy_worker.rb b/ee/app/workers/security/sync_policy_worker.rb index 9d58913612d52bd47124aa31db7b19fce3b38da3..0fd36b3b351c19560f3a45596d0ea8f27b7bd48e 100644 --- a/ee/app/workers/security/sync_policy_worker.rb +++ b/ee/app/workers/security/sync_policy_worker.rb @@ -12,6 +12,9 @@ class SyncPolicyWorker feature_category :security_policy_management + BATCH_DELAY_INTERVAL = 1.second + BATCH_SIZE = 100 + def handle_event(event) security_policy_id = event.data[:security_policy_id] policy = Security::Policy.find_by_id(security_policy_id) || return @@ -72,16 +75,35 @@ def sync_policy_for_projects(policy, event_data = {}, event = nil) clear_policy_sync_state(policy.security_orchestration_policy_configuration_id) + batch_index = 0 + feature_enabled = Feature.enabled?(:security_policies_batched_sync_delay, + policy.security_policy_management_project) + policy.security_orchestration_policy_configuration.all_project_ids do |project_ids| append_projects_to_sync(policy.security_orchestration_policy_configuration_id, project_ids) - ::Security::SyncProjectPolicyWorker.bulk_perform_async_with_contexts( - project_ids, - arguments_proc: ->(project_id) do - [project_id, policy.id, event_data, event_payload.deep_stringify_keys] - end, - context_proc: ->(_) { config_context } - ) + if feature_enabled + project_ids.each_slice(BATCH_SIZE).with_index do |batch, slice_index| + delay = [BATCH_DELAY_INTERVAL, (batch_index + slice_index) * BATCH_DELAY_INTERVAL].max + + ::Security::SyncProjectPolicyWorker.bulk_perform_in_with_contexts(delay, batch, + arguments_proc: ->(project_id) do + [project_id, policy.id, event_data, event_payload.deep_stringify_keys] + end, + context_proc: ->(_) { config_context } + ) + end + else + ::Security::SyncProjectPolicyWorker.bulk_perform_async_with_contexts( + project_ids, + arguments_proc: ->(project_id) do + [project_id, policy.id, event_data, event_payload.deep_stringify_keys] + end, + context_proc: ->(_) { config_context } + ) + end + + batch_index += (project_ids.size.to_f / BATCH_SIZE).ceil end end diff --git a/ee/config/feature_flags/gitlab_com_derisk/security_policies_batched_sync_delay.yml b/ee/config/feature_flags/gitlab_com_derisk/security_policies_batched_sync_delay.yml new file mode 100644 index 0000000000000000000000000000000000000000..0b325716f2aaeb3d615f3c614bd006ffe4a0506b --- /dev/null +++ b/ee/config/feature_flags/gitlab_com_derisk/security_policies_batched_sync_delay.yml @@ -0,0 +1,10 @@ +--- +name: security_policies_batched_sync_delay +description: +feature_issue_url: https://gitlab.com/gitlab-org/gitlab/-/issues/580036 +introduced_by_url: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/216298 +rollout_issue_url: https://gitlab.com/gitlab-org/gitlab/-/issues/584386 +milestone: '18.8' +group: group::security policies +type: gitlab_com_derisk +default_enabled: false diff --git a/ee/spec/workers/security/sync_policy_worker_spec.rb b/ee/spec/workers/security/sync_policy_worker_spec.rb index 64ce497d64d3f539e3d89c4b552d9f13fa2afb93..83e682c6b9e79176c1eee5df6652f9ec8f36a37b 100644 --- a/ee/spec/workers/security/sync_policy_worker_spec.rb +++ b/ee/spec/workers/security/sync_policy_worker_spec.rb @@ -83,6 +83,62 @@ end end + shared_examples 'syncs projects with feature flag behavior' do + it 'calls Security::SyncProjectPolicyWorker with batching and delays' do + expect(::Security::SyncProjectPolicyWorker) + .to receive(:bulk_perform_in_with_contexts) + .with( + 1.second, + match_array(project_ids), + arguments_proc: kind_of(Proc), + context_proc: kind_of(Proc) + ) + + handle_event + end + + context 'when security_policies_batched_sync_delay is disabled' do + before do + stub_feature_flags(security_policies_batched_sync_delay: false) + end + + it 'calls Security::SyncProjectPolicyWorker without delays' do + expect(::Security::SyncProjectPolicyWorker) + .to receive(:bulk_perform_async_with_contexts) + .with( + match_array(project_ids), + arguments_proc: kind_of(Proc), + context_proc: kind_of(Proc) + ) + + handle_event + end + end + end + + shared_examples 'batches projects correctly' do |batch_count, batch_sizes| + it 'batches projects and applies delays' do + expect(::Security::SyncProjectPolicyWorker).to receive(:bulk_perform_in_with_contexts) + .exactly(batch_count).times.and_call_original + + handle_event + end + + it 'distributes projects across batches correctly' do + batches = [] + + allow(::Security::SyncProjectPolicyWorker).to receive(:bulk_perform_in_with_contexts) do |_delay, batch, **_| + batches << batch + end + + handle_event + + batch_sizes.each_with_index do |size, index| + expect(batches[index].size).to eq(size) + end + end + end + context 'when event is Security::PolicyDeletedEvent' do let(:policy_deleted_event) do Security::PolicyDeletedEvent.new(data: { security_policy_id: policy.id }) @@ -110,16 +166,10 @@ let(:event) { policy_created_event } end - it 'calls Security::SyncProjectPolicyWorker' do - expect(::Security::SyncProjectPolicyWorker) - .to receive(:bulk_perform_async_with_contexts) - .with( - [project.id], - arguments_proc: kind_of(Proc), - context_proc: kind_of(Proc) - ) + context 'with single project' do + let(:project_ids) { [project.id] } - handle_event + it_behaves_like 'syncs projects with feature flag behavior' end it 'calls ComplianceFrameworks::SyncService' do @@ -146,6 +196,7 @@ context 'when policy_configuration is scoped to a namespace with multiple projects' do let_it_be(:namespace) { create(:namespace) } + let(:project_ids) { [project1.id, project2.id] } let_it_be(:project1) { create(:project, namespace: namespace) } let_it_be(:project2) { create(:project, namespace: namespace) } let_it_be(:policy_configuration) do @@ -158,17 +209,7 @@ stub_csp_group(nil) end - it 'calls Security::SyncProjectPolicyWorker' do - expect(::Security::SyncProjectPolicyWorker) - .to receive(:bulk_perform_async_with_contexts) - .with( - match_array([project1.id, project2.id]), - arguments_proc: kind_of(Proc), - context_proc: kind_of(Proc) - ) - - handle_event - end + it_behaves_like 'syncs projects with feature flag behavior' end it_behaves_like 'triggers SyncPipelineExecutionPolicyMetadataWorker' @@ -193,20 +234,15 @@ let(:event) { policy_updated_event } end - it 'calls Security::SyncProjectPolicyWorker' do - expect(::Security::SyncProjectPolicyWorker) - .to receive(:bulk_perform_async_with_contexts) - .with( - [project.id], - arguments_proc: kind_of(Proc), - context_proc: kind_of(Proc) - ) + context 'with single project' do + let(:project_ids) { [project.id] } - handle_event + it_behaves_like 'syncs projects with feature flag behavior' end context 'when policy_configuration is scoped to a namespace with multiple projects' do let_it_be(:namespace) { create(:namespace) } + let(:project_ids) { [project1.id, project2.id] } let_it_be(:project1) { create(:project, namespace: namespace) } let_it_be(:project2) { create(:project, namespace: namespace) } let_it_be(:policy_configuration) do @@ -219,17 +255,7 @@ stub_csp_group(nil) end - it 'calls Security::SyncProjectPolicyWorker' do - expect(::Security::SyncProjectPolicyWorker) - .to receive(:bulk_perform_async_with_contexts) - .with( - match_array([project1.id, project2.id]), - arguments_proc: kind_of(Proc), - context_proc: kind_of(Proc) - ) - - handle_event - end + it_behaves_like 'syncs projects with feature flag behavior' end context 'when policy actions is changed' do @@ -244,17 +270,9 @@ } end - it 'calls Security::SyncProjectPolicyWorker' do - expect(::Security::SyncProjectPolicyWorker) - .to receive(:bulk_perform_async_with_contexts) - .with( - [project.id], - arguments_proc: kind_of(Proc), - context_proc: kind_of(Proc) - ) + let(:project_ids) { [project.id] } - handle_event - end + it_behaves_like 'syncs projects with feature flag behavior' end context 'when policy scope changed' do @@ -266,17 +284,9 @@ } end - it 'calls Security::SyncProjectPolicyWorker' do - expect(::Security::SyncProjectPolicyWorker) - .to receive(:bulk_perform_async_with_contexts) - .with( - [project.id], - arguments_proc: kind_of(Proc), - context_proc: kind_of(Proc) - ) + let(:project_ids) { [project.id] } - handle_event - end + it_behaves_like 'syncs projects with feature flag behavior' it 'calls ComplianceFrameworks::SyncService with policy_diff' do expect(Security::SecurityOrchestrationPolicies::ComplianceFrameworks::SyncService) @@ -355,16 +365,10 @@ it_behaves_like 'clears policy sync state' - it 'calls Security::SyncProjectPolicyWorker with event payload' do - expect(::Security::SyncProjectPolicyWorker) - .to receive(:bulk_perform_async_with_contexts) - .with( - [project.id], - arguments_proc: kind_of(Proc), - context_proc: kind_of(Proc) - ) + context 'with single project' do + let(:project_ids) { [project.id] } - handle_event + it_behaves_like 'syncs projects with feature flag behavior' end it 'calls ComplianceFrameworks::SyncService' do @@ -405,4 +409,90 @@ end end end + + describe 'batching and delays for large number of projects' do + let(:policy_created_event) { Security::PolicyCreatedEvent.new(data: { security_policy_id: policy.id }) } + + subject(:handle_event) { described_class.new.handle_event(policy_created_event) } + + context 'when policy_configuration affects more than BATCH_SIZE projects' do + let_it_be(:namespace) { create(:namespace) } + let_it_be(:projects) { create_list(:project, 10, namespace: namespace) } + let_it_be(:policy_configuration) do + create(:security_orchestration_policy_configuration, namespace: namespace, project: nil) + end + + let_it_be(:policy) do + create(:security_policy, security_orchestration_policy_configuration: policy_configuration) + end + + before do + stub_csp_group(nil) + stub_const("#{described_class}::BATCH_SIZE", 3) + end + + it_behaves_like 'batches projects correctly', 4, [3, 3, 3, 1] + end + + context 'when policy_configuration affects fewer projects than BATCH_SIZE' do + let_it_be(:namespace) { create(:namespace) } + let_it_be(:projects) { create_list(:project, 2, namespace: namespace) } + let_it_be(:policy_configuration) do + create(:security_orchestration_policy_configuration, namespace: namespace, project: nil) + end + + let_it_be(:policy) do + create(:security_policy, security_orchestration_policy_configuration: policy_configuration) + end + + before do + stub_csp_group(nil) + stub_const("#{described_class}::BATCH_SIZE", 3) + end + + it 'creates a single batch with 1 second delay' do + expect(::Security::SyncProjectPolicyWorker) + .to receive(:bulk_perform_in_with_contexts) + .once + .with(1.second, match_array(projects.map(&:id)), arguments_proc: kind_of(Proc), context_proc: kind_of(Proc)) + + handle_event + end + end + + context 'when feature flag is disabled' do + before do + stub_feature_flags(security_policies_batched_sync_delay: false) + end + + context 'when policy_configuration affects multiple projects' do + let_it_be(:namespace) { create(:namespace) } + let_it_be(:projects) { create_list(:project, 10, namespace: namespace) } + let_it_be(:policy_configuration) do + create(:security_orchestration_policy_configuration, namespace: namespace, project: nil) + end + + let_it_be(:policy) do + create(:security_policy, security_orchestration_policy_configuration: policy_configuration) + end + + before do + stub_csp_group(nil) + end + + it 'calls bulk_perform_async_with_contexts without delays' do + expect(::Security::SyncProjectPolicyWorker) + .to receive(:bulk_perform_async_with_contexts) + .once + .with( + match_array(projects.map(&:id)), + arguments_proc: kind_of(Proc), + context_proc: kind_of(Proc) + ) + + handle_event + end + end + end + end end