diff --git a/doc/api/graphql/reference/_index.md b/doc/api/graphql/reference/_index.md
index 495cd3f1610a54eb5d2e857a5663fb3bf771b64e..1e4b242757cee16d807ac27417562be8535b9af8 100644
--- a/doc/api/graphql/reference/_index.md
+++ b/doc/api/graphql/reference/_index.md
@@ -2139,6 +2139,7 @@ Input type: `AdminSidekiqQueuesDeleteJobsInput`
| `mergeActionStatus` | [`String`](#string) | Delete jobs matching merge_action_status in the context metadata. |
| `organizationId` | [`String`](#string) | Delete jobs matching organization_id in the context metadata. |
| `pipelineId` | [`String`](#string) | Delete jobs matching pipeline_id in the context metadata. |
+| `policySyncConfigId` | [`String`](#string) | Delete jobs matching policy_sync_config_id in the context metadata. |
| `project` | [`String`](#string) | Delete jobs matching project in the context metadata. |
| `queueName` | [`String!`](#string) | Name of the queue to delete jobs from. |
| `relatedClass` | [`String`](#string) | Delete jobs matching related_class in the context metadata. |
@@ -37503,6 +37504,20 @@ Check permissions for the current user on a vulnerability finding.
| `owner` | [`UserCore!`](#usercore) | Owner of the pipeline trigger token. |
| `token` | [`String!`](#string) | Value of the pipeline trigger token. |
+### `PoliciesSyncUpdated`
+
+Security policy state synchronization update. Returns `null` if the `security_policy_sync_propagation_tracking` feature flag is disabled.
+
+#### Fields
+
+| Name | Type | Description |
+| ---- | ---- | ----------- |
+| `failedProjects` | [`[String!]`](#string) | IDs of failed projects. |
+| `mergeRequestsProgress` | [`Float`](#float) | Percentage of merge requests synced. |
+| `mergeRequestsTotal` | [`Int`](#int) | Total number of merge requests synced. |
+| `projectsProgress` | [`Float`](#float) | Percentage of projects synced. |
+| `projectsTotal` | [`Int`](#int) | Total number of projects synced. |
+
### `PolicyAnyMergeRequestViolation`
Represents policy violation for `any_merge_request` report_type.
@@ -50891,6 +50906,12 @@ A `SbomOccurrenceID` is a global ID. It is encoded as a string.
An example `SbomOccurrenceID` is: `"gid://gitlab/Sbom::Occurrence/1"`.
+### `SecurityOrchestrationPolicyConfigurationID`
+
+A `SecurityOrchestrationPolicyConfigurationID` is a global ID. It is encoded as a string.
+
+An example `SecurityOrchestrationPolicyConfigurationID` is: `"gid://gitlab/Security::OrchestrationPolicyConfiguration/1"`.
+
### `SecurityProjectSecurityExclusionID`
A `SecurityProjectSecurityExclusionID` is a global ID. It is encoded as a string.
diff --git a/ee/app/graphql/ee/graphql_triggers.rb b/ee/app/graphql/ee/graphql_triggers.rb
index d612c957ab61f0d970c011dfc65a2954882a3535..9269de6a64451d026d342fb200173422f4212e24 100644
--- a/ee/app/graphql/ee/graphql_triggers.rb
+++ b/ee/app/graphql/ee/graphql_triggers.rb
@@ -68,6 +68,22 @@ def self.security_policy_project_created(container, status, security_policy_proj
{ status: status, errors: errors, error_message: error_message, project: security_policy_project }
)
end
+
+ def self.security_policies_sync_updated(
+ policy_configuration,
+ projects_progress,
+ projects_total,
+ failed_projects,
+ merge_requests_progress,
+ merge_requests_total
+ )
+ ::GitlabSchema.subscriptions.trigger(
+ :security_policies_sync_updated,
+ { policy_configuration_id: policy_configuration.to_global_id },
+ { projects_progress: projects_progress, projects_total: projects_total, failed_projects: failed_projects,
+ merge_requests_progress: merge_requests_progress, merge_requests_total: merge_requests_total }
+ )
+ end
end
end
end
diff --git a/ee/app/graphql/ee/types/subscription_type.rb b/ee/app/graphql/ee/types/subscription_type.rb
index 8d5aa918566eff06bb9c2d340e5237b1c3a58c1e..90ec9c5fa418fa318bfbff0807a093a7989209de 100644
--- a/ee/app/graphql/ee/types/subscription_type.rb
+++ b/ee/app/graphql/ee/types/subscription_type.rb
@@ -40,6 +40,11 @@ def self.authorization_scopes
subscription: Subscriptions::Security::PolicyProjectCreated, null: true,
description: 'Triggered when the security policy project is created for a specific group or project.',
experiment: { milestone: '17.3' }
+
+ field :security_policies_sync_updated,
+ subscription: Subscriptions::Security::PoliciesSyncUpdated, null: true,
+ description: 'Triggered when the security policy sync updates the status.',
+ experiment: { milestone: '18.4' }
end
end
end
diff --git a/ee/app/graphql/subscriptions/security/policies_sync_updated.rb b/ee/app/graphql/subscriptions/security/policies_sync_updated.rb
new file mode 100644
index 0000000000000000000000000000000000000000..37ca81ef9318ae181f3555bfd92220e9ffc81cd4
--- /dev/null
+++ b/ee/app/graphql/subscriptions/security/policies_sync_updated.rb
@@ -0,0 +1,42 @@
+# frozen_string_literal: true
+
+module Subscriptions
+ module Security
+ class PoliciesSyncUpdated < ::Subscriptions::BaseSubscription
+ include Gitlab::Graphql::Laziness
+
+ payload_type Types::GitlabSubscriptions::Security::PoliciesSyncUpdated
+
+ argument :policy_configuration_id, ::Types::GlobalIDType[::Security::OrchestrationPolicyConfiguration],
+ required: true,
+ description: 'ID of the security orchestration policy configuration.'
+
+ def authorized?(policy_configuration_id:)
+ policy_configuration = find_policy_configuration(policy_configuration_id) || unauthorized!
+
+ if current_user.can?(:update_security_orchestration_policy_project,
+ policy_configuration.security_policy_management_project)
+ return true
+ end
+
+ unauthorized!
+ end
+
+ def update(_)
+ {
+ projects_progress: object[:projects_progress],
+ projects_total: object[:projects_total],
+ failed_projects: object[:failed_projects],
+ merge_requests_progress: object[:merge_requests_progress],
+ merge_requests_total: object[:merge_requests_total]
+ }
+ end
+
+ private
+
+ def find_policy_configuration(id)
+ force(GitlabSchema.find_by_gid(id))
+ end
+ end
+ end
+end
diff --git a/ee/app/graphql/types/gitlab_subscriptions/security/policies_sync_updated.rb b/ee/app/graphql/types/gitlab_subscriptions/security/policies_sync_updated.rb
new file mode 100644
index 0000000000000000000000000000000000000000..e297921bedffd9ee0e14057236275ff52f44ce07
--- /dev/null
+++ b/ee/app/graphql/types/gitlab_subscriptions/security/policies_sync_updated.rb
@@ -0,0 +1,38 @@
+# frozen_string_literal: true
+
+module Types
+ module GitlabSubscriptions
+ module Security
+ # rubocop:disable Graphql/AuthorizeTypes -- Authorization is handled in subscription
+ # rubocop:disable GraphQL/ExtractType -- Not worth combining (merge_requests, merge_requests_total) into a newtype
+ class PoliciesSyncUpdated < ::Types::BaseObject
+ graphql_name 'PoliciesSyncUpdated'
+ # rubocop:disable Layout/LineLength -- Ensures correct Markdown rendering
+ description 'Security policy state synchronization update. Returns `null` if the `security_policy_sync_propagation_tracking` feature flag is disabled.'
+ # rubocop:enable Layout/LineLength
+
+ field :projects_progress, GraphQL::Types::Float,
+ null: true,
+ description: 'Percentage of projects synced.'
+
+ field :projects_total, GraphQL::Types::Int,
+ null: true,
+ description: 'Total number of projects synced.'
+
+ field :failed_projects, [GraphQL::Types::String],
+ null: true,
+ description: 'IDs of failed projects.'
+
+ field :merge_requests_progress, GraphQL::Types::Float,
+ null: true,
+ description: 'Percentage of merge requests synced.'
+
+ field :merge_requests_total, GraphQL::Types::Int,
+ null: true,
+ description: 'Total number of merge requests synced.'
+ end
+ # rubocop:enable GraphQL/ExtractType
+ # rubocop:enable Graphql/AuthorizeTypes
+ end
+ end
+end
diff --git a/ee/config/feature_flags/gitlab_com_derisk/security_policy_sync_propagation_tracking.yml b/ee/config/feature_flags/gitlab_com_derisk/security_policy_sync_propagation_tracking.yml
new file mode 100644
index 0000000000000000000000000000000000000000..c8fe9136f2a8d8c3ac4e6824bad4857f6deb1ec5
--- /dev/null
+++ b/ee/config/feature_flags/gitlab_com_derisk/security_policy_sync_propagation_tracking.yml
@@ -0,0 +1,10 @@
+---
+name: security_policy_sync_propagation_tracking
+description:
+feature_issue_url: https://gitlab.com/gitlab-org/gitlab/-/issues/559273
+introduced_by_url: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/201777
+rollout_issue_url: https://gitlab.com/gitlab-org/gitlab/-/issues/561007
+milestone: '18.4'
+group: group::security policies
+type: gitlab_com_derisk
+default_enabled: false
diff --git a/ee/lib/ee/gitlab/application_context.rb b/ee/lib/ee/gitlab/application_context.rb
index 67177f6326403a648955cf0d64b28c30432dd2db..27fb0c2071d2bdb294c1e6778765e1172cd71549 100644
--- a/ee/lib/ee/gitlab/application_context.rb
+++ b/ee/lib/ee/gitlab/application_context.rb
@@ -10,11 +10,13 @@ module ApplicationContext
EE_KNOWN_KEYS = [
:subscription_plan,
- :ai_resource
+ :ai_resource,
+ ::Security::SecurityOrchestrationPolicies::PolicySyncState::POLICY_SYNC_CONTEXT_KEY
].freeze
EE_APPLICATION_ATTRIBUTES = [
- Attribute.new(:ai_resource, ::GlobalID)
+ Attribute.new(:ai_resource, ::GlobalID),
+ Attribute.new(::Security::SecurityOrchestrationPolicies::PolicySyncState::POLICY_SYNC_CONTEXT_KEY, Integer)
].freeze
class_methods do
@@ -35,6 +37,9 @@ def application_attributes
def to_lazy_hash
super.tap do |hash|
assign_hash_if_value(hash, :ai_resource)
+ # rubocop:disable Layout/LineLength -- namespaced constant
+ assign_hash_if_value(hash, ::Security::SecurityOrchestrationPolicies::PolicySyncState::POLICY_SYNC_CONTEXT_KEY)
+ # rubocop:enable Layout/LineLength
hash[:subscription_plan] = -> { subcription_plan_name } if include_namespace?
end
diff --git a/ee/lib/security/security_orchestration_policies/policy_sync_state.rb b/ee/lib/security/security_orchestration_policies/policy_sync_state.rb
new file mode 100644
index 0000000000000000000000000000000000000000..5ad5452ac72b1b5960fb7a77c95190f6937374fa
--- /dev/null
+++ b/ee/lib/security/security_orchestration_policies/policy_sync_state.rb
@@ -0,0 +1,300 @@
+# frozen_string_literal: true
+
+module Security
+ module SecurityOrchestrationPolicies
+ module PolicySyncState
+ POLICY_SYNC_TTL = 24.hours.to_i
+ POLICY_SYNC_CONTEXT_KEY = :policy_sync_config_id
+
+ class State
+ include Gitlab::Utils::StrongMemoize
+
+ def self.from_application_context
+ config_id = Gitlab::ApplicationContext.current_context_attribute(POLICY_SYNC_CONTEXT_KEY)&.to_i || return
+
+ new(config_id)
+ end
+
+ attr_reader :config_id
+
+ def initialize(config_id)
+ @config_id = config_id
+ end
+
+ # Appends project IDs, adding to the pending set and incrementing the total counter
+ def append_projects(project_ids)
+ return if feature_disabled? || project_ids.empty?
+
+ with_redis do |redis|
+ redis.multi do |multi|
+ multi.sadd(projects_sync_key, project_ids)
+ multi.incrby(total_projects_key, project_ids.size)
+
+ multi.expire(projects_sync_key, POLICY_SYNC_TTL)
+ multi.expire(total_projects_key, POLICY_SYNC_TTL)
+ end
+ end
+ end
+
+ # Marks the project ID as successfully synced and triggers a status update
+ def finish_project(project_id)
+ return if feature_disabled?
+
+ with_redis do |redis|
+ redis.srem(projects_sync_key, project_id.to_s)
+ end
+
+ trigger_subscription
+ end
+
+ # Marks the project ID as failed to sync and triggers a status update
+ def fail_project(project_id)
+ return if feature_disabled?
+
+ with_redis do |redis|
+ redis.multi do |multi|
+ multi.sadd(failed_projects_sync_key, project_id)
+ multi.srem(projects_sync_key, project_id.to_s)
+
+ multi.expire(failed_projects_sync_key, POLICY_SYNC_TTL)
+ end
+ end
+
+ trigger_subscription
+ end
+
+ # Registers an MR for tracking and initializes the worker counter
+ def start_merge_request(merge_request_id)
+ return if feature_disabled?
+
+ with_redis do |redis|
+ redis.multi do |multi|
+ multi.sadd(merge_requests_sync_key, merge_request_id)
+ multi.incr(total_merge_requests_key)
+ multi.set(merge_request_workers_sync_key(merge_request_id), 0)
+
+ multi.expire(merge_requests_sync_key, POLICY_SYNC_TTL)
+ multi.expire(total_merge_requests_key, POLICY_SYNC_TTL)
+ multi.expire(merge_request_workers_sync_key(merge_request_id), POLICY_SYNC_TTL)
+ end
+ end
+ end
+
+ # Increments the worker counter for a given merge request
+ def start_merge_request_worker(merge_request_id)
+ return if feature_disabled?
+
+ with_redis do |redis|
+ redis.incr(merge_request_workers_sync_key(merge_request_id))
+ end
+ end
+
+ # Decrements the merge request worker count and if it hits zero, marks the merge request
+ # as fully synced
+ def finish_merge_request_worker(merge_request_id)
+ return if feature_disabled?
+
+ with_redis do |redis|
+ new_value = redis.decr(merge_request_workers_sync_key(merge_request_id))
+
+ if new_value <= 0
+ redis.srem(merge_requests_sync_key, merge_request_id.to_s)
+
+ trigger_subscription
+ end
+ end
+ end
+
+ def sync_in_progress?
+ return false if feature_disabled?
+
+ with_redis do |redis|
+ conditions = redis.multi do |multi|
+ # rubocop:disable CodeReuse/ActiveRecord -- false positive
+ multi.exists?(total_projects_key)
+ multi.exists?(total_merge_requests_key)
+ # rubocop:enable CodeReuse/ActiveRecord
+
+ multi.scard(projects_sync_key)
+ multi.scard(merge_requests_sync_key)
+ end
+
+ # rubocop:disable Layout/LineLength -- TODO
+ conditions.then do |total_projects, total_merge_requests, project_pending_count, merge_request_pending_count|
+ total_projects && total_merge_requests && (project_pending_count > 0 || merge_request_pending_count > 0)
+ end
+ # rubocop:enable Layout/LineLength
+ end
+ end
+
+ def clear
+ return if feature_disabled?
+
+ with_redis do |redis|
+ redis.del(projects_sync_key)
+ redis.del(total_projects_key)
+ redis.del(failed_projects_sync_key)
+ redis.del(merge_requests_sync_key)
+ redis.del(total_merge_requests_key)
+ end
+ end
+
+ # Pending project IDs.
+ def pending_projects
+ get_items(projects_sync_key)
+ end
+
+ # Failed project IDs.
+ def failed_projects
+ get_items(failed_projects_sync_key)
+ end
+
+ # Pending merge request IDs.
+ def pending_merge_requests
+ get_items(merge_requests_sync_key)
+ end
+
+ # Total number of pending merge requests
+ def total_merge_request_workers_count(merge_request_id)
+ with_redis do |redis|
+ redis.get(merge_request_workers_sync_key(merge_request_id))&.to_i
+ end
+ end
+
+ # Total number of pending projects
+ def total_project_count
+ with_redis do |redis|
+ redis.get(total_projects_key)&.to_i
+ end
+ end
+
+ # Total number of pending merge requests
+ def total_merge_request_count
+ with_redis do |redis|
+ redis.get(total_merge_requests_key)&.to_i
+ end
+ end
+
+ private
+
+ def redis_key_tag
+ "{security_policy_sync:#{config_id}}"
+ end
+
+ # Set: project IDs pending synchronization
+ def projects_sync_key
+ "#{redis_key_tag}:projects"
+ end
+
+ # Integer: initial total number of projects for percentage calculation
+ def total_projects_key
+ "#{redis_key_tag}:total_projects"
+ end
+
+ # Set: merge request IDs with at least one active sync worker
+ def merge_requests_sync_key
+ "#{redis_key_tag}:merge_requests"
+ end
+
+ # Integer: Countdown for active downstream workers for a merge request
+ def merge_request_workers_sync_key(merge_request_id)
+ "#{redis_key_tag}:merge_requests:#{merge_request_id}:workers"
+ end
+
+ # Integer: Total number of unique merge requests processed during sync
+ def total_merge_requests_key
+ "#{redis_key_tag}:total_merge_requests"
+ end
+
+ # Set: Project IDs that failed to sync after all retries
+ def failed_projects_sync_key
+ "#{redis_key_tag}:failed_projects"
+ end
+
+ def get_progress(pending, total)
+ return if total == 0
+
+ ((total - pending).to_f / total * 100).round
+ end
+
+ def trigger_subscription
+ projects_pending, projects_total, all_failed_projects, merge_requests_pending, merge_requests_total =
+ with_redis do |redis|
+ [
+ redis.scard(projects_sync_key),
+ redis.get(total_projects_key).to_i,
+ redis.smembers(failed_projects_sync_key),
+ redis.scard(merge_requests_sync_key),
+ redis.get(total_merge_requests_key).to_i
+ ]
+ end
+
+ GraphqlTriggers.security_policies_sync_updated(
+ policy_configuration,
+ get_progress(projects_pending, projects_total),
+ projects_total,
+ all_failed_projects,
+ get_progress(merge_requests_pending, merge_requests_total),
+ merge_requests_total
+ )
+ end
+
+ def feature_disabled?
+ strong_memoize_with(:feature_disabled, config_id) do
+ project = policy_configuration&.security_policy_management_project
+
+ break true unless project
+
+ Feature.disabled?(:security_policy_sync_propagation_tracking, project)
+ end
+ end
+
+ def policy_configuration
+ strong_memoize_with(:policy_configuration, config_id) do
+ Security::OrchestrationPolicyConfiguration.find_by_id(config_id)
+ end
+ end
+
+ def get_items(key)
+ with_redis do |redis|
+ redis.smembers(key)
+ end
+ end
+
+ def with_redis(&block)
+ Gitlab::Redis::SharedState.with(&block) # rubocop:disable CodeReuse/ActiveRecord -- false positive
+ end
+ end
+
+ module Callbacks
+ def clear_policy_sync_state(config_id)
+ State.new(config_id).clear
+ end
+
+ def append_projects_to_sync(config_id, project_ids)
+ State.new(config_id).append_projects(project_ids)
+ end
+
+ def finish_project_policy_sync(project_id)
+ State.from_application_context&.finish_project(project_id)
+ end
+
+ def fail_project_policy_sync(project_id)
+ State.from_application_context&.fail_project(project_id)
+ end
+
+ def start_merge_request_policy_sync(merge_request_id)
+ State.from_application_context&.start_merge_request(merge_request_id)
+ end
+
+ def start_merge_request_worker_policy_sync(merge_request_id)
+ State.from_application_context&.start_merge_request_worker(merge_request_id)
+ end
+
+ def finish_merge_request_worker_policy_sync(merge_request_id)
+ State.from_application_context&.finish_merge_request_worker(merge_request_id)
+ end
+ end
+ end
+ end
+end
diff --git a/ee/spec/graphql/graphql_triggers_spec.rb b/ee/spec/graphql/graphql_triggers_spec.rb
index fdf23b054f62b49b3b52703cfc322b1aa161cc7b..e6115a6fc92c10b2e639e4c38c9f3b9d8f3e522b 100644
--- a/ee/spec/graphql/graphql_triggers_spec.rb
+++ b/ee/spec/graphql/graphql_triggers_spec.rb
@@ -188,4 +188,40 @@
end
end
end
+
+ describe '.security_policies_sync_updated', feature_category: :security_policy_management do
+ subject(:trigger) do
+ described_class.security_policies_sync_updated(
+ policy_configuration,
+ projects,
+ projects_total,
+ failed_projects,
+ merge_requests,
+ merge_requests_total
+ )
+ end
+
+ let_it_be(:policy_configuration) { create(:security_orchestration_policy_configuration) }
+ let(:projects) { 75.5 }
+ let(:projects_total) { 100 }
+ let(:failed_projects) { [non_existing_record_id.to_s] }
+ let(:merge_requests) { 50.0 }
+ let(:merge_requests_total) { 200 }
+
+ specify do
+ expect(GitlabSchema.subscriptions).to receive(:trigger).with(
+ :security_policies_sync_updated,
+ { policy_configuration_id: policy_configuration.to_global_id },
+ {
+ projects_progress: projects,
+ projects_total: projects_total,
+ failed_projects: failed_projects,
+ merge_requests_progress: merge_requests,
+ merge_requests_total: merge_requests_total
+ }
+ ).and_call_original
+
+ trigger
+ end
+ end
end
diff --git a/ee/spec/graphql/subscriptions/security/policies_sync_updated_spec.rb b/ee/spec/graphql/subscriptions/security/policies_sync_updated_spec.rb
new file mode 100644
index 0000000000000000000000000000000000000000..9492f71de95f33d767abc20261763b0990e1378a
--- /dev/null
+++ b/ee/spec/graphql/subscriptions/security/policies_sync_updated_spec.rb
@@ -0,0 +1,61 @@
+# frozen_string_literal: true
+
+require "spec_helper"
+
+RSpec.describe Subscriptions::Security::PoliciesSyncUpdated, feature_category: :security_policy_management do
+ include GraphqlHelpers
+ include ::Graphql::Subscriptions::Security::PoliciesSyncUpdated::Helper
+
+ let_it_be(:policy_configuration) { create(:security_orchestration_policy_configuration, :namespace) }
+ let_it_be(:security_policy_project) { policy_configuration.security_policy_management_project }
+ let_it_be(:current_user) { security_policy_project.creator }
+
+ let(:projects_progress) { 75.5 }
+ let(:projects_total) { 100 }
+ let(:failed_projects) { ["123"] }
+ let(:merge_requests_progress) { 50 }
+ let(:merge_requests_total) { 200 }
+
+ let(:subscribe) { security_policies_sync_updated_subscription(policy_configuration, current_user) }
+
+ before do
+ stub_licensed_features(security_orchestration_policies: true)
+
+ stub_const('GitlabSchema', Graphql::Subscriptions::ActionCable::MockGitlabSchema)
+ Graphql::Subscriptions::ActionCable::MockActionCable.clear_mocks
+ end
+
+ subject(:response) do
+ subscription_response do
+ GraphqlTriggers.security_policies_sync_updated(
+ policy_configuration,
+ projects_progress,
+ projects_total,
+ failed_projects,
+ merge_requests_progress,
+ merge_requests_total)
+ end
+ end
+
+ context 'when authorized' do
+ subject(:success_response) do
+ graphql_dig_at(graphql_data(response[:result]), :securityPoliciesSyncUpdated)
+ end
+
+ specify do
+ expect(success_response).to eq({
+ "projectsProgress" => projects_progress,
+ "projectsTotal" => projects_total,
+ "failedProjects" => failed_projects,
+ "mergeRequestsProgress" => merge_requests_progress,
+ "mergeRequestsTotal" => merge_requests_total
+ })
+ end
+ end
+
+ context 'when unauthorized' do
+ let_it_be(:current_user) { create(:user) }
+
+ it { is_expected.to be_nil }
+ end
+end
diff --git a/ee/spec/lib/security/security_orchestration_policies/policy_sync_state/callbacks_spec.rb b/ee/spec/lib/security/security_orchestration_policies/policy_sync_state/callbacks_spec.rb
new file mode 100644
index 0000000000000000000000000000000000000000..ad2174e1ad283cdb0cea0ac5c7016e6bbfa0c9fd
--- /dev/null
+++ b/ee/spec/lib/security/security_orchestration_policies/policy_sync_state/callbacks_spec.rb
@@ -0,0 +1,89 @@
+# frozen_string_literal: true
+
+require 'spec_helper'
+
+RSpec.describe Security::SecurityOrchestrationPolicies::PolicySyncState::Callbacks, :clean_gitlab_redis_shared_state, feature_category: :security_policy_management do
+ include described_class
+
+ include_context 'with policy sync state'
+
+ let(:merge_request_id) { 1 }
+ let(:project_id) { 1 }
+
+ describe '#clear_policy_sync_state' do
+ specify do
+ expect_next_instance_of(Security::SecurityOrchestrationPolicies::PolicySyncState::State,
+ policy_configuration_id) do |state|
+ expect(state).to receive(:clear)
+ end
+
+ clear_policy_sync_state(policy_configuration_id)
+ end
+ end
+
+ describe '#append_projects_to_sync' do
+ specify do
+ expect_next_instance_of(Security::SecurityOrchestrationPolicies::PolicySyncState::State,
+ policy_configuration_id) do |state|
+ expect(state).to receive(:append_projects).with([project_id])
+ end
+
+ append_projects_to_sync(policy_configuration_id, [project_id])
+ end
+ end
+
+ describe '#finish_project_policy_sync' do
+ specify do
+ expect_next_instance_of(Security::SecurityOrchestrationPolicies::PolicySyncState::State,
+ policy_configuration_id) do |state|
+ expect(state).to receive(:finish_project).with(project_id)
+ end
+
+ finish_project_policy_sync(project_id)
+ end
+ end
+
+ describe '#fail_project_policy_sync' do
+ specify do
+ expect_next_instance_of(Security::SecurityOrchestrationPolicies::PolicySyncState::State,
+ policy_configuration_id) do |state|
+ expect(state).to receive(:fail_project).with(project_id)
+ end
+
+ fail_project_policy_sync(project_id)
+ end
+ end
+
+ describe '#start_merge_request_policy_sync' do
+ specify do
+ expect_next_instance_of(Security::SecurityOrchestrationPolicies::PolicySyncState::State,
+ policy_configuration_id) do |state|
+ expect(state).to receive(:start_merge_request).with(merge_request_id)
+ end
+
+ start_merge_request_policy_sync(merge_request_id)
+ end
+ end
+
+ describe '#start_merge_request_worker_policy_sync' do
+ specify do
+ expect_next_instance_of(Security::SecurityOrchestrationPolicies::PolicySyncState::State,
+ policy_configuration_id) do |state|
+ expect(state).to receive(:start_merge_request_worker).with(merge_request_id)
+ end
+
+ start_merge_request_worker_policy_sync(merge_request_id)
+ end
+ end
+
+ describe '#finish_merge_request_worker_policy_sync' do
+ specify do
+ expect_next_instance_of(Security::SecurityOrchestrationPolicies::PolicySyncState::State,
+ policy_configuration_id) do |state|
+ expect(state).to receive(:finish_merge_request_worker).with(merge_request_id)
+ end
+
+ finish_merge_request_worker_policy_sync(merge_request_id)
+ end
+ end
+end
diff --git a/ee/spec/lib/security/security_orchestration_policies/policy_sync_state/state_spec.rb b/ee/spec/lib/security/security_orchestration_policies/policy_sync_state/state_spec.rb
new file mode 100644
index 0000000000000000000000000000000000000000..e23c7c0af94f64a36f4f2ba2d855cb7ddfa30bd0
--- /dev/null
+++ b/ee/spec/lib/security/security_orchestration_policies/policy_sync_state/state_spec.rb
@@ -0,0 +1,491 @@
+# frozen_string_literal: true
+
+require 'spec_helper'
+
+RSpec.describe Security::SecurityOrchestrationPolicies::PolicySyncState::State, :clean_gitlab_redis_shared_state, feature_category: :security_policy_management do
+ let(:merge_request_id) { 1 }
+ let(:project_id) { 1 }
+ let(:other_project_id) { 2 }
+
+ let_it_be(:policy_configuration) { create(:security_orchestration_policy_configuration, :namespace) }
+ let(:policy_configuration_id) { policy_configuration.id }
+
+ subject(:state) { described_class.new(policy_configuration.id) }
+
+ describe '.from_application_context' do
+ before do
+ allow(Gitlab::ApplicationContext).to receive(:current_context_attribute).and_return(context_value)
+ end
+
+ subject(:state) { described_class.from_application_context }
+
+ context 'with context key' do
+ let(:context_value) { policy_configuration.id.to_s }
+
+ let_it_be(:policy_configuration) { create(:security_orchestration_policy_configuration, :namespace) }
+
+ it { is_expected.to be_a(described_class) }
+ end
+
+ context 'without context key' do
+ let(:context_value) { nil }
+
+ it { is_expected.to be_nil }
+ end
+ end
+
+ describe '#append_projects' do
+ context 'when adding new project IDs' do
+ it 'adds project IDs as pending' do
+ expect { state.append_projects([project_id, other_project_id]) }.to change {
+ state.pending_projects
+ }.from(be_empty).to(contain_exactly(project_id.to_s, other_project_id.to_s))
+ end
+
+ it 'maintains set membership' do
+ expect { 2.times { state.append_projects([project_id]) } }.to change {
+ state.pending_projects
+ }.from(be_empty).to(contain_exactly(project_id.to_s))
+ end
+
+ it 'increments the total counter by the number of projects' do
+ state.append_projects([1, 2, 3])
+ state.append_projects([4, 5])
+
+ expect(state.total_project_count).to be(5)
+ end
+ end
+
+ context 'with feature disabled' do
+ before do
+ stub_feature_flags(security_policy_sync_propagation_tracking: false)
+ end
+
+ it 'does not add pending IDs' do
+ expect { state.append_projects([project_id, other_project_id]) }.not_to change {
+ state.pending_projects
+ }.from(be_empty)
+ end
+ end
+ end
+
+ describe '#finish_project' do
+ before do
+ state.append_projects([project_id, other_project_id])
+ end
+
+ it 'removes a pending project ID' do
+ expect { state.finish_project(project_id) }.to change {
+ state.pending_projects
+ }.from(contain_exactly(project_id.to_s, other_project_id.to_s)).to(contain_exactly(other_project_id.to_s))
+ end
+
+ context 'when project ID is not pending' do
+ it 'does not alter pending project IDs' do
+ expect { state.finish_project(4) }.not_to change {
+ state.pending_projects
+ }.from(contain_exactly(project_id.to_s, other_project_id.to_s))
+ end
+ end
+
+ it 'triggers subscription' do
+ expect(state).to receive(:trigger_subscription).exactly(:once).and_call_original
+
+ state.finish_project(other_project_id)
+ end
+
+ context 'with feature disabled' do
+ before do
+ stub_feature_flags(security_policy_sync_propagation_tracking: false)
+
+ state.clear_memoization(:feature_disabled)
+ end
+
+ it 'does not remove a pending project ID' do
+ expect { state.finish_project(project_id) }.not_to change {
+ state.pending_projects
+ }.from(contain_exactly(project_id.to_s, other_project_id.to_s))
+ end
+
+ it 'does not trigger subscription' do
+ expect(state).not_to receive(:trigger_subscription)
+
+ state.finish_project(other_project_id)
+ end
+ end
+ end
+
+ describe '#fail_project' do
+ it 'adds a project ID as failed' do
+ expect { state.fail_project(project_id) }.to change {
+ state.failed_projects
+ }.from(be_empty).to(contain_exactly(project_id.to_s))
+ end
+
+ it 'removes a project ID as pending' do
+ state.append_projects([project_id])
+
+ expect { state.fail_project(project_id) }.to change {
+ state.pending_projects
+ }.from(contain_exactly(project_id.to_s)).to(be_empty)
+ end
+
+ it 'maintains set membership' do
+ expect { 2.times { state.fail_project(project_id) } }.to change {
+ state.failed_projects
+ }.from(be_empty).to(contain_exactly(project_id.to_s))
+ end
+
+ it 'triggers subscription' do
+ expect(state).to receive(:trigger_subscription).exactly(:once).and_call_original
+
+ state.finish_project(project_id)
+ end
+
+ context 'with feature disabled' do
+ before do
+ stub_feature_flags(security_policy_sync_propagation_tracking: false)
+ end
+
+ it 'does not add a project ID as failed' do
+ expect { state.fail_project(project_id) }.not_to change {
+ state.failed_projects
+ }.from(be_empty)
+ end
+
+ it 'does not trigger subscription' do
+ expect(state).not_to receive(:trigger_subscription)
+
+ state.finish_project(project_id)
+ end
+ end
+ end
+
+ describe '#start_merge_request' do
+ it 'adds a merge request ID as pending' do
+ expect { state.start_merge_request(merge_request_id) }.to change {
+ state.pending_merge_requests
+ }.from(be_empty).to(match_array("1"))
+ end
+
+ it 'maintains set membership' do
+ expect { 2.times { state.start_merge_request(merge_request_id) } }.to change {
+ state.pending_merge_requests
+ }.from(be_empty).to(match_array("1"))
+ end
+
+ it 'increases total merge request count' do
+ expect { state.start_merge_request(merge_request_id) }.to change {
+ state.total_merge_request_count
+ }.from(nil).to(1)
+ end
+
+ it 'initializes merge request worker count' do
+ expect { state.start_merge_request(merge_request_id) }.to change {
+ state.total_merge_request_workers_count(merge_request_id)
+ }.from(nil).to(0)
+ end
+
+ context 'with feature disabled' do
+ before do
+ stub_feature_flags(security_policy_sync_propagation_tracking: false)
+ end
+
+ it 'does not add a merge request ID as pending' do
+ expect { state.start_merge_request(merge_request_id) }.not_to change {
+ state.pending_merge_requests
+ }.from(be_empty)
+ end
+
+ it 'does not increase total merge request count' do
+ expect { state.start_merge_request(merge_request_id) }.not_to change {
+ state.total_merge_request_count
+ }.from(nil)
+ end
+
+ it 'does not initialize merge request worker count' do
+ expect { state.start_merge_request(merge_request_id) }.not_to change {
+ state.total_merge_request_workers_count(merge_request_id)
+ }.from(nil)
+ end
+ end
+ end
+
+ describe '#start_merge_request_worker' do
+ it 'increases merge request worker count', :aggregate_failures do
+ expect { state.start_merge_request_worker(merge_request_id) }.to change {
+ state.total_merge_request_workers_count(merge_request_id)
+ }.from(nil).to(1)
+
+ expect { state.start_merge_request_worker(merge_request_id) }.to change {
+ state.total_merge_request_workers_count(merge_request_id)
+ }.from(1).to(2)
+ end
+
+ context 'with feature disabled' do
+ before do
+ stub_feature_flags(security_policy_sync_propagation_tracking: false)
+ end
+
+ it 'does not increase merge request worker count', :aggregate_failures do
+ expect { state.start_merge_request_worker(merge_request_id) }.not_to change {
+ state.total_merge_request_workers_count(merge_request_id)
+ }.from(nil)
+
+ expect { state.start_merge_request_worker(merge_request_id) }.not_to change {
+ state.total_merge_request_workers_count(merge_request_id)
+ }.from(nil)
+ end
+ end
+ end
+
+ describe '#finish_merge_request_worker' do
+ before do
+ state.start_merge_request(merge_request_id)
+ end
+
+ context 'with last worker' do
+ before do
+ state.start_merge_request_worker(merge_request_id)
+ end
+
+ it 'decrements merge request worker count' do
+ expect { state.finish_merge_request_worker(merge_request_id) }.to change {
+ state.total_merge_request_workers_count(merge_request_id)
+ }.from(1).to(0)
+ end
+
+ it 'removes merge request from pending set' do
+ expect { state.finish_merge_request_worker(merge_request_id) }.to change {
+ state.pending_merge_requests
+ }.from(contain_exactly(merge_request_id.to_s)).to(be_empty)
+ end
+
+ it 'triggers subscription' do
+ expect(state).to receive(:trigger_subscription).exactly(:once).and_call_original
+
+ state.finish_merge_request_worker(merge_request_id)
+ end
+
+ context 'with feature disabled' do
+ before do
+ stub_feature_flags(security_policy_sync_propagation_tracking: false)
+
+ state.clear_memoization(:feature_disabled)
+ end
+
+ it 'does not decrement merge request worker count' do
+ expect { state.finish_merge_request_worker(merge_request_id) }.not_to change {
+ state.total_merge_request_workers_count(merge_request_id)
+ }.from(1)
+ end
+
+ it 'does not remove merge request from pending set' do
+ expect { state.finish_merge_request_worker(merge_request_id) }.not_to change {
+ state.pending_merge_requests
+ }.from(contain_exactly(merge_request_id.to_s))
+ end
+
+ it 'does not trigger subscription' do
+ expect(state).not_to receive(:trigger_subscription)
+
+ state.finish_merge_request_worker(merge_request_id)
+ end
+ end
+ end
+
+ context 'with remaining workers' do
+ before do
+ 2.times { state.start_merge_request_worker(merge_request_id) }
+ end
+
+ it 'decrements merge request worker count' do
+ expect { state.finish_merge_request_worker(merge_request_id) }.to change {
+ state.total_merge_request_workers_count(merge_request_id)
+ }.from(2).to(1)
+ end
+
+ it 'does not remove merge request from pending set' do
+ expect { state.finish_merge_request_worker(merge_request_id) }.not_to change {
+ state.pending_merge_requests
+ }.from(contain_exactly(merge_request_id.to_s))
+ end
+
+ it 'does not trigger subscription' do
+ expect(state).not_to receive(:trigger_subscription)
+
+ state.finish_merge_request_worker(merge_request_id)
+ end
+
+ context 'with feature disabled' do
+ before do
+ stub_feature_flags(security_policy_sync_propagation_tracking: false)
+
+ state.clear_memoization(:feature_disabled)
+ end
+
+ it 'does not decrement merge request worker count' do
+ expect { state.finish_merge_request_worker(merge_request_id) }.not_to change {
+ state.total_merge_request_workers_count(merge_request_id)
+ }.from(2)
+ end
+
+ it 'does not trigger subscription' do
+ expect(state).not_to receive(:trigger_subscription)
+
+ state.finish_merge_request_worker(merge_request_id)
+ end
+ end
+ end
+ end
+
+ describe '#sync_in_progress?' do
+ subject(:sync_in_progress?) { state.sync_in_progress? }
+
+ it { is_expected.to be(false) }
+
+ context 'with project and merge request' do
+ before do
+ state.append_projects([project_id])
+ state.start_merge_request(merge_request_id)
+ state.start_merge_request_worker(merge_request_id)
+ end
+
+ it { is_expected.to be(true) }
+
+ context 'with feature disabled' do
+ before do
+ stub_feature_flags(security_policy_sync_propagation_tracking: false)
+
+ state.clear_memoization(:feature_disabled)
+ end
+
+ it { is_expected.to be(false) }
+ end
+
+ context 'with all projects processed' do
+ before do
+ state.finish_project(project_id)
+ end
+
+ it { is_expected.to be(true) }
+ end
+
+ context 'with all merge request processed' do
+ before do
+ state.finish_merge_request_worker(merge_request_id)
+ end
+
+ it { is_expected.to be(true) }
+ end
+
+ context 'with all projects and merge requests processed' do
+ before do
+ state.finish_project(project_id)
+ state.finish_merge_request_worker(merge_request_id)
+ end
+
+ it { is_expected.to be(false) }
+ end
+ end
+ end
+
+ describe '#clear' do
+ subject(:clear) { state.clear }
+
+ before do
+ state.append_projects([project_id])
+ state.start_merge_request(merge_request_id)
+ end
+
+ it 'resets pending project IDs' do
+ expect { clear }.to change { state.pending_projects }.from(contain_exactly(project_id.to_s)).to(be_empty)
+ end
+
+ it 'resets pending merge request IDs' do
+ expect { clear }.to change {
+ state.pending_merge_requests
+ }.from(contain_exactly(merge_request_id.to_s)).to(be_empty)
+ end
+
+ context 'with feature disabled' do
+ before do
+ stub_feature_flags(security_policy_sync_propagation_tracking: false)
+
+ state.clear_memoization(:feature_disabled)
+ end
+
+ it 'does not reset pending project IDs' do
+ expect { clear }.not_to change { state.pending_projects }.from(contain_exactly(project_id.to_s))
+ end
+
+ it 'does not reset pending merge request IDs' do
+ expect { clear }.not_to change {
+ state.pending_merge_requests
+ }.from(contain_exactly(merge_request_id.to_s))
+ end
+ end
+ end
+
+ describe '#trigger_subscription' do
+ let_it_be(:policy_configuration) { create(:security_orchestration_policy_configuration, :namespace) }
+
+ subject(:state) { described_class.new(policy_configuration.id) }
+
+ before do
+ 10.times do |i|
+ state.append_projects([i])
+ state.start_merge_request(i)
+ state.start_merge_request_worker(i)
+ end
+ end
+
+ it 'publishes with correct values at each step' do
+ expect(GraphqlTriggers).to receive(:security_policies_sync_updated).with(
+ policy_configuration,
+ 10, # project progress: (10 total - 9 pending) / 10 * 100 = 10%
+ 10, # projects total
+ [], # no failed projects yet
+ 0, # merge request progress: (10 total - 10 pending) / 10 * 100 = 0%
+ 10 # merge requests total
+ ).ordered
+ state.finish_project(1)
+
+ expect(GraphqlTriggers).to receive(:security_policies_sync_updated).with(
+ policy_configuration,
+ 10, # project progress: still 10%
+ 10, # projects total
+ [], # no failed projects yet
+ 10, # merge request progress: (10 total - 9 pending) / 10 * 100 = 10%
+ 10 # merge requests total
+ ).ordered
+ state.finish_merge_request_worker(1)
+
+ expect(GraphqlTriggers).to receive(:security_policies_sync_updated).with(
+ policy_configuration,
+ 20, # project progress: (10 total - 8 pending) / 10 * 100 = 20%
+ 10, # projects total
+ ["2"], # failed project 2
+ 10, # merge request progress: still 10%
+ 10 # merge requests total
+ ).ordered
+ state.fail_project(2)
+ end
+
+ it 'shows increasing progress as items complete' do
+ 5.times do |i|
+ allow(GraphqlTriggers).to receive(:security_policies_sync_updated)
+ state.finish_project(i)
+ end
+
+ expect(GraphqlTriggers).to have_received(:security_policies_sync_updated).with(
+ policy_configuration,
+ 50, # project progress: (10 - 5) / 10 * 100 = 50%
+ 10, # projects total
+ [], # no failed projects
+ 0, # merge request progress: still 0%
+ 10 # merge requests total
+ )
+ end
+ end
+end
diff --git a/ee/spec/support/helpers/graphql/subscriptions/security/policies_sync_updated/helper.rb b/ee/spec/support/helpers/graphql/subscriptions/security/policies_sync_updated/helper.rb
new file mode 100644
index 0000000000000000000000000000000000000000..87bd6464c6a3ce60fe6678b0923b1dcd58b9a7e1
--- /dev/null
+++ b/ee/spec/support/helpers/graphql/subscriptions/security/policies_sync_updated/helper.rb
@@ -0,0 +1,42 @@
+# frozen_string_literal: true
+
+module Graphql
+ module Subscriptions
+ module Security
+ module PoliciesSyncUpdated
+ module Helper
+ def subscription_response
+ subscription_channel = subscribe
+ yield
+ subscription_channel.mock_broadcasted_messages.first
+ end
+
+ def security_policies_sync_updated_subscription(policy_configuration, current_user)
+ mock_channel = Graphql::Subscriptions::ActionCable::MockActionCable.get_mock_channel
+ query = security_policies_sync_updated_subscription_query(policy_configuration)
+
+ GitlabSchema.execute(query, context: { current_user: current_user, channel: mock_channel })
+
+ mock_channel
+ end
+
+ private
+
+ def security_policies_sync_updated_subscription_query(policy_configuration)
+ <<~SUBSCRIPTION
+ subscription {
+ securityPoliciesSyncUpdated(policyConfigurationId: \"#{policy_configuration.to_global_id}\") {
+ projectsProgress
+ projectsTotal
+ failedProjects
+ mergeRequestsProgress
+ mergeRequestsTotal
+ }
+ }
+ SUBSCRIPTION
+ end
+ end
+ end
+ end
+ end
+end
diff --git a/spec/support/shared_examples/policies/policy_sync_state_shared_examples.rb b/spec/support/shared_examples/policies/policy_sync_state_shared_examples.rb
new file mode 100644
index 0000000000000000000000000000000000000000..115eca76faaceb00c682f35f3b8ca12e2deb1346
--- /dev/null
+++ b/spec/support/shared_examples/policies/policy_sync_state_shared_examples.rb
@@ -0,0 +1,13 @@
+# frozen_string_literal: true
+
+RSpec.shared_context 'with policy sync state', :clean_gitlab_redis_shared_state do
+ let_it_be(:policy_configuration) { create(:security_orchestration_policy_configuration, :namespace) }
+
+ let(:policy_configuration_id) { policy_configuration.id }
+ let(:state) { Security::SecurityOrchestrationPolicies::PolicySyncState::State.new(policy_configuration_id) }
+
+ before do
+ allow(Gitlab::ApplicationContext).to receive(:current_context_attribute)
+ .and_return(policy_configuration_id.to_s)
+ end
+end