From 052dfa0564670a492b4336f6f3044b805fd90448 Mon Sep 17 00:00:00 2001 From: Furkan Ayhan Date: Wed, 1 Dec 2021 23:02:36 +0300 Subject: [PATCH 1/4] Implement syncing ci_project_mirrors and ci_namespace_mirrors tables These tables will be shadow tables from the main DB to CI DB. In this commit, we - create projects_sync_events and namespaces_sync_events tables - create triggers to populate events when changing namespace or project - introduce a FF ci_namespace_project_mirrors Changelog: other --- app/models/ci/namespace_mirror.rb | 29 +++- app/models/ci/project_mirror.rb | 9 +- app/models/namespace.rb | 11 ++ app/models/namespaces/sync_event.rb | 17 +++ app/models/project.rb | 11 ++ app/models/projects/sync_event.rb | 17 +++ .../ci/process_sync_events_service.rb | 43 ++++++ app/workers/all_queues.yml | 18 +++ .../namespaces/process_sync_events_worker.rb | 19 +++ .../projects/process_sync_events_worker.rb | 19 +++ .../ci_namespace_project_mirrors.yml | 8 ++ config/sidekiq_queues.yml | 4 + ...011140932_create_namespaces_sync_events.rb | 9 ++ ...11011141239_create_projects_sync_events.rb | 9 ++ ...11141242_create_namespaces_sync_trigger.rb | 37 +++++ ...1011141243_create_projects_sync_trigger.rb | 37 +++++ db/schema_migrations/20211011140932 | 1 + db/schema_migrations/20211011141239 | 1 + db/schema_migrations/20211011141242 | 1 + db/schema_migrations/20211011141243 | 1 + db/structure.sql | 78 +++++++++++ lib/gitlab/database/gitlab_schemas.yml | 2 + lib/gitlab/database/schema_helpers.rb | 1 + spec/models/ci/namespace_mirror_spec.rb | 94 +++++++++++++ spec/models/ci/project_mirror_spec.rb | 36 +++++ spec/models/namespace_spec.rb | 71 ++++++++++ spec/models/project_spec.rb | 71 ++++++++++ spec/models/user_spec.rb | 2 +- .../ci/process_sync_events_service_spec.rb | 129 ++++++++++++++++++ .../process_sync_events_worker_spec.rb | 32 +++++ .../process_sync_events_worker_spec.rb | 28 ++++ 31 files changed, 842 insertions(+), 3 deletions(-) create mode 100644 app/models/namespaces/sync_event.rb create mode 100644 app/models/projects/sync_event.rb create mode 100644 app/services/ci/process_sync_events_service.rb create mode 100644 app/workers/namespaces/process_sync_events_worker.rb create mode 100644 app/workers/projects/process_sync_events_worker.rb create mode 100644 config/feature_flags/development/ci_namespace_project_mirrors.yml create mode 100644 db/migrate/20211011140932_create_namespaces_sync_events.rb create mode 100644 db/migrate/20211011141239_create_projects_sync_events.rb create mode 100644 db/migrate/20211011141242_create_namespaces_sync_trigger.rb create mode 100644 db/migrate/20211011141243_create_projects_sync_trigger.rb create mode 100644 db/schema_migrations/20211011140932 create mode 100644 db/schema_migrations/20211011141239 create mode 100644 db/schema_migrations/20211011141242 create mode 100644 db/schema_migrations/20211011141243 create mode 100644 spec/models/ci/namespace_mirror_spec.rb create mode 100644 spec/models/ci/project_mirror_spec.rb create mode 100644 spec/services/ci/process_sync_events_service_spec.rb create mode 100644 spec/workers/namespaces/process_sync_events_worker_spec.rb create mode 100644 spec/workers/projects/process_sync_events_worker_spec.rb diff --git a/app/models/ci/namespace_mirror.rb b/app/models/ci/namespace_mirror.rb index a497d2cabe5577..656b9ea8fe4e29 100644 --- a/app/models/ci/namespace_mirror.rb +++ b/app/models/ci/namespace_mirror.rb @@ -4,6 +4,33 @@ module Ci # This model represents a record in a shadow table of the main database's namespaces table. # It allows us to navigate the namespace hierarchy on the ci database without resorting to a JOIN. class NamespaceMirror < ApplicationRecord - # Will be filled by https://gitlab.com/gitlab-org/gitlab/-/merge_requests/75517 + belongs_to :namespace + + scope :contains_namespaces, -> (ids) { + where('traversal_ids @> ARRAY[?]::int[]', ids.join(',')) + } + + class << self + def sync!(event) + traversal_ids = event.namespace.self_and_ancestor_ids(hierarchy_order: :desc) + + upsert({ namespace_id: event.namespace_id, traversal_ids: traversal_ids }, + unique_by: :namespace_id) + + # TODO: after fully implemented `sync_traversal_ids` FF, we will not need this method. + # However, we also need to change the PG trigger to reflect `namespaces.traversal_ids` changes + sync_children_namespaces!(event.namespace_id, traversal_ids) + end + + private + + def sync_children_namespaces!(namespace_id, traversal_ids) + contains_namespaces([namespace_id]) + .where.not(namespace_id: namespace_id) + .update_all( + "traversal_ids = ARRAY[#{traversal_ids.join(',')}]::int[] || traversal_ids[array_position(traversal_ids, #{namespace_id}) + 1:]" + ) + end + end end end diff --git a/app/models/ci/project_mirror.rb b/app/models/ci/project_mirror.rb index c6e3101fb3ab7c..d6aaa3f50c142c 100644 --- a/app/models/ci/project_mirror.rb +++ b/app/models/ci/project_mirror.rb @@ -4,6 +4,13 @@ module Ci # This model represents a shadow table of the main database's projects table. # It allows us to navigate the project and namespace hierarchy on the ci database. class ProjectMirror < ApplicationRecord - # Will be filled by https://gitlab.com/gitlab-org/gitlab/-/merge_requests/75517 + belongs_to :project + + class << self + def sync!(event) + upsert({ project_id: event.project_id, namespace_id: event.project.namespace_id }, + unique_by: :project_id) + end + end end end diff --git a/app/models/namespace.rb b/app/models/namespace.rb index 75e06ac2b86ee8..51d18b420e6422 100644 --- a/app/models/namespace.rb +++ b/app/models/namespace.rb @@ -64,6 +64,9 @@ class Namespace < ApplicationRecord has_one :admin_note, inverse_of: :namespace accepts_nested_attributes_for :admin_note, update_only: true + has_one :ci_namespace_mirror, class_name: 'Ci::NamespaceMirror' + has_many :sync_events, class_name: 'Namespaces::SyncEvent' + validates :owner, presence: true, if: ->(n) { n.owner_required? } validates :name, presence: true, @@ -104,6 +107,8 @@ class Namespace < ApplicationRecord delegate :name, to: :owner, allow_nil: true, prefix: true delegate :avatar_url, to: :owner, allow_nil: true + after_save :schedule_sync_event_worker, if: -> { saved_change_to_id? || saved_change_to_parent_id? } + after_commit :refresh_access_of_projects_invited_groups, on: :update, if: -> { previous_changes.key?('share_with_group_lock') } before_create :sync_share_with_group_lock_with_parent @@ -613,6 +618,12 @@ def write_projects_repository_config def enforce_minimum_path_length? path_changed? && !project_namespace? end + + def schedule_sync_event_worker + run_after_commit do + Namespaces::SyncEvent.enqueue_worker + end + end end Namespace.prepend_mod_with('Namespace') diff --git a/app/models/namespaces/sync_event.rb b/app/models/namespaces/sync_event.rb new file mode 100644 index 00000000000000..e1aa3a1788306b --- /dev/null +++ b/app/models/namespaces/sync_event.rb @@ -0,0 +1,17 @@ +# frozen_string_literal: true + +# This model serves to keep track of changes to the namespaces table in the main database, and allowing to safely +# replicate these changes to other databases. +class Namespaces::SyncEvent < ApplicationRecord + self.table_name = 'namespaces_sync_events' + + belongs_to :namespace + + scope :preload_synced_relation, -> { preload(:namespace) } + scope :order_by_id_asc, -> { order(id: :asc) } + scope :for_id, -> (id) { where(id: id) } + + def self.enqueue_worker + ::Namespaces::ProcessSyncEventsWorker.perform_async # rubocop:disable CodeReuse/Worker + end +end diff --git a/app/models/project.rb b/app/models/project.rb index 6d35534bb6a635..a6cbf478e293ba 100644 --- a/app/models/project.rb +++ b/app/models/project.rb @@ -102,6 +102,8 @@ class Project < ApplicationRecord after_save :update_project_statistics, if: :saved_change_to_namespace_id? + after_save :schedule_sync_event_worker, if: -> { saved_change_to_id? || saved_change_to_namespace_id? } + after_save :create_import_state, if: ->(project) { project.import? && project.import_state.nil? } after_save :save_topics @@ -394,6 +396,9 @@ def self.integration_association_name(name) has_many :timelogs + has_one :ci_project_mirror, class_name: 'Ci::ProjectMirror' + has_many :sync_events, class_name: 'Projects::SyncEvent' + accepts_nested_attributes_for :variables, allow_destroy: true accepts_nested_attributes_for :project_feature, update_only: true accepts_nested_attributes_for :project_setting, update_only: true @@ -2932,6 +2937,12 @@ def sync_attributes(project_namespace) project_namespace.shared_runners_enabled = shared_runners_enabled project_namespace.visibility_level = visibility_level end + + def schedule_sync_event_worker + run_after_commit do + Projects::SyncEvent.enqueue_worker + end + end end Project.prepend_mod_with('Project') diff --git a/app/models/projects/sync_event.rb b/app/models/projects/sync_event.rb new file mode 100644 index 00000000000000..46016733abb496 --- /dev/null +++ b/app/models/projects/sync_event.rb @@ -0,0 +1,17 @@ +# frozen_string_literal: true + +# This model serves to keep track of changes to the namespaces table in the main database as they relate to projects, +# allowing to safely replicate changes to other databases. +class Projects::SyncEvent < ApplicationRecord + self.table_name = 'projects_sync_events' + + belongs_to :project + + scope :preload_synced_relation, -> { preload(:project) } + scope :order_by_id_asc, -> { order(id: :asc) } + scope :for_id, -> (id) { where(id: id) } + + def self.enqueue_worker + ::Projects::ProcessSyncEventsWorker.perform_async # rubocop:disable CodeReuse/Worker + end +end diff --git a/app/services/ci/process_sync_events_service.rb b/app/services/ci/process_sync_events_service.rb new file mode 100644 index 00000000000000..f36d849a3f63ea --- /dev/null +++ b/app/services/ci/process_sync_events_service.rb @@ -0,0 +1,43 @@ +# frozen_string_literal: true + +module Ci + class ProcessSyncEventsService + include Gitlab::Utils::StrongMemoize + + BATCH_SIZE = 1000 + + def initialize(sync_event_class, sync_class) + @sync_event_class = sync_event_class + @sync_class = sync_class + end + + def execute + return unless ::Feature.enabled?(:ci_namespace_project_mirrors, default_enabled: :yaml) + + process_events + enqueue_worker_if_there_still_event + end + + private + + def process_events + events = @sync_event_class + .preload_synced_relation + .order_by_id_asc + .limit(BATCH_SIZE) + .to_a + + return if events.empty? + + min = events[0] + max = events[-1] + + events.each { |event| @sync_class.sync!(event) } + @sync_event_class.for_id(min.id..max.id).delete_all + end + + def enqueue_worker_if_there_still_event + @sync_event_class.enqueue_worker if @sync_event_class.exists? + end + end +end diff --git a/app/workers/all_queues.yml b/app/workers/all_queues.yml index 89a4d9dc7cfee8..e84646047e49e7 100644 --- a/app/workers/all_queues.yml +++ b/app/workers/all_queues.yml @@ -2492,6 +2492,15 @@ :weight: 1 :idempotent: true :tags: [] +- :name: namespaces_process_sync_events + :worker_name: Namespaces::ProcessSyncEventsWorker + :feature_category: :sharding + :has_external_dependencies: + :urgency: :high + :resource_boundary: :unknown + :weight: 1 + :idempotent: true + :tags: [] - :name: new_issue :worker_name: NewIssueWorker :feature_category: :team_planning @@ -2663,6 +2672,15 @@ :weight: 1 :idempotent: true :tags: [] +- :name: projects_process_sync_events + :worker_name: Projects::ProcessSyncEventsWorker + :feature_category: :sharding + :has_external_dependencies: + :urgency: :high + :resource_boundary: :unknown + :weight: 1 + :idempotent: true + :tags: [] - :name: projects_schedule_bulk_repository_shard_moves :worker_name: Projects::ScheduleBulkRepositoryShardMovesWorker :feature_category: :gitaly diff --git a/app/workers/namespaces/process_sync_events_worker.rb b/app/workers/namespaces/process_sync_events_worker.rb new file mode 100644 index 00000000000000..80985e13fa7cc3 --- /dev/null +++ b/app/workers/namespaces/process_sync_events_worker.rb @@ -0,0 +1,19 @@ +# frozen_string_literal: true + +module Namespaces + class ProcessSyncEventsWorker + include ApplicationWorker + + data_consistency :always + + feature_category :sharding + urgency :high + + deduplicate :until_executed + idempotent! + + def perform + ::Ci::ProcessSyncEventsService.new(::Namespaces::SyncEvent, ::Ci::NamespaceMirror).execute + end + end +end diff --git a/app/workers/projects/process_sync_events_worker.rb b/app/workers/projects/process_sync_events_worker.rb new file mode 100644 index 00000000000000..40242059550f3d --- /dev/null +++ b/app/workers/projects/process_sync_events_worker.rb @@ -0,0 +1,19 @@ +# frozen_string_literal: true + +module Projects + class ProcessSyncEventsWorker + include ApplicationWorker + + data_consistency :always + + feature_category :sharding + urgency :high + + deduplicate :until_executed + idempotent! + + def perform + ::Ci::ProcessSyncEventsService.new(::Projects::SyncEvent, ::Ci::ProjectMirror).execute + end + end +end diff --git a/config/feature_flags/development/ci_namespace_project_mirrors.yml b/config/feature_flags/development/ci_namespace_project_mirrors.yml new file mode 100644 index 00000000000000..a2d674c37707c5 --- /dev/null +++ b/config/feature_flags/development/ci_namespace_project_mirrors.yml @@ -0,0 +1,8 @@ +--- +name: ci_namespace_project_mirrors +introduced_by_url: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/75517 +rollout_issue_url: https://gitlab.com/gitlab-org/gitlab/-/issues/346786 +milestone: '14.6' +type: development +group: group::sharding +default_enabled: false diff --git a/config/sidekiq_queues.yml b/config/sidekiq_queues.yml index 7e6bcc35e60170..77ff9a36c4f0f6 100644 --- a/config/sidekiq_queues.yml +++ b/config/sidekiq_queues.yml @@ -271,6 +271,8 @@ - 1 - - namespaces_onboarding_user_added - 1 +- - namespaces_process_sync_events + - 1 - - namespaces_sync_namespace_name - 1 - - new_epic @@ -337,6 +339,8 @@ - 1 - - projects_post_creation - 1 +- - projects_process_sync_events + - 1 - - projects_schedule_bulk_repository_shard_moves - 1 - - projects_update_repository_storage diff --git a/db/migrate/20211011140932_create_namespaces_sync_events.rb b/db/migrate/20211011140932_create_namespaces_sync_events.rb new file mode 100644 index 00000000000000..06831423343767 --- /dev/null +++ b/db/migrate/20211011140932_create_namespaces_sync_events.rb @@ -0,0 +1,9 @@ +# frozen_string_literal: true + +class CreateNamespacesSyncEvents < Gitlab::Database::Migration[1.0] + def change + create_table :namespaces_sync_events do |t| + t.references :namespace, null: false, index: true, foreign_key: { on_delete: :cascade } + end + end +end diff --git a/db/migrate/20211011141239_create_projects_sync_events.rb b/db/migrate/20211011141239_create_projects_sync_events.rb new file mode 100644 index 00000000000000..50fe988ac1b900 --- /dev/null +++ b/db/migrate/20211011141239_create_projects_sync_events.rb @@ -0,0 +1,9 @@ +# frozen_string_literal: true + +class CreateProjectsSyncEvents < Gitlab::Database::Migration[1.0] + def change + create_table :projects_sync_events do |t| + t.references :project, null: false, index: true, foreign_key: { on_delete: :cascade } + end + end +end diff --git a/db/migrate/20211011141242_create_namespaces_sync_trigger.rb b/db/migrate/20211011141242_create_namespaces_sync_trigger.rb new file mode 100644 index 00000000000000..91f64709f28680 --- /dev/null +++ b/db/migrate/20211011141242_create_namespaces_sync_trigger.rb @@ -0,0 +1,37 @@ +# frozen_string_literal: true + +class CreateNamespacesSyncTrigger < Gitlab::Database::Migration[1.0] + include Gitlab::Database::SchemaHelpers + + enable_lock_retries! + + TABLE_NAME = 'namespaces' + EVENT_TABLE_NAME = 'namespaces_sync_events' + FUNCTION_NAME = 'insert_namespaces_sync_event' + TRIGGER_ON_INSERT = 'trigger_namespaces_parent_id_on_insert' + TRIGGER_ON_UPDATE = 'trigger_namespaces_parent_id_on_update' + + def up + create_trigger_function(FUNCTION_NAME) do + <<~SQL + INSERT INTO #{EVENT_TABLE_NAME} (namespace_id) + VALUES(COALESCE(NEW.id, OLD.id)); + RETURN NULL; + SQL + end + + create_trigger(TABLE_NAME, TRIGGER_ON_INSERT, FUNCTION_NAME, fires: 'AFTER INSERT') + + create_trigger(TABLE_NAME, TRIGGER_ON_UPDATE, FUNCTION_NAME, fires: 'AFTER UPDATE') do + <<~SQL + WHEN (OLD.parent_id IS DISTINCT FROM NEW.parent_id) + SQL + end + end + + def down + drop_trigger(TABLE_NAME, TRIGGER_ON_INSERT) + drop_trigger(TABLE_NAME, TRIGGER_ON_UPDATE) + drop_function(FUNCTION_NAME) + end +end diff --git a/db/migrate/20211011141243_create_projects_sync_trigger.rb b/db/migrate/20211011141243_create_projects_sync_trigger.rb new file mode 100644 index 00000000000000..03b31c35a3a236 --- /dev/null +++ b/db/migrate/20211011141243_create_projects_sync_trigger.rb @@ -0,0 +1,37 @@ +# frozen_string_literal: true + +class CreateProjectsSyncTrigger < Gitlab::Database::Migration[1.0] + include Gitlab::Database::SchemaHelpers + + enable_lock_retries! + + TABLE_NAME = 'projects' + EVENT_TABLE_NAME = 'projects_sync_events' + FUNCTION_NAME = 'insert_projects_sync_event' + TRIGGER_ON_INSERT = 'trigger_projects_parent_id_on_insert' + TRIGGER_ON_UPDATE = 'trigger_projects_parent_id_on_update' + + def up + create_trigger_function(FUNCTION_NAME) do + <<~SQL + INSERT INTO #{EVENT_TABLE_NAME} (project_id) + VALUES(COALESCE(NEW.id, OLD.id)); + RETURN NULL; + SQL + end + + create_trigger(TABLE_NAME, TRIGGER_ON_INSERT, FUNCTION_NAME, fires: 'AFTER INSERT') + + create_trigger(TABLE_NAME, TRIGGER_ON_UPDATE, FUNCTION_NAME, fires: 'AFTER UPDATE') do + <<~SQL + WHEN (OLD.namespace_id IS DISTINCT FROM NEW.namespace_id) + SQL + end + end + + def down + drop_trigger(TABLE_NAME, TRIGGER_ON_INSERT) + drop_trigger(TABLE_NAME, TRIGGER_ON_UPDATE) + drop_function(FUNCTION_NAME) + end +end diff --git a/db/schema_migrations/20211011140932 b/db/schema_migrations/20211011140932 new file mode 100644 index 00000000000000..af0e000b9f3e2a --- /dev/null +++ b/db/schema_migrations/20211011140932 @@ -0,0 +1 @@ +0209db1e7be48bcbf0e52b451d37da0ef2ecadd567cdfa47907fc5032c258a27 \ No newline at end of file diff --git a/db/schema_migrations/20211011141239 b/db/schema_migrations/20211011141239 new file mode 100644 index 00000000000000..f215f234a7e1a5 --- /dev/null +++ b/db/schema_migrations/20211011141239 @@ -0,0 +1 @@ +bc0ae055b331801fbe020c12a66e4e6ae790780121bfd66fd161093c94c7a84a \ No newline at end of file diff --git a/db/schema_migrations/20211011141242 b/db/schema_migrations/20211011141242 new file mode 100644 index 00000000000000..01d082a4bc892b --- /dev/null +++ b/db/schema_migrations/20211011141242 @@ -0,0 +1 @@ +9fd4977cdb57df827fe1a01f55a305d832ee4240d40af9396e093e3b4dbd1e33 \ No newline at end of file diff --git a/db/schema_migrations/20211011141243 b/db/schema_migrations/20211011141243 new file mode 100644 index 00000000000000..cb2df22b8d7298 --- /dev/null +++ b/db/schema_migrations/20211011141243 @@ -0,0 +1 @@ +b3ce6aa41c70cdcf8637a94c3d4d4e97730899221530f5507c4581aaf2fc3a6c \ No newline at end of file diff --git a/db/structure.sql b/db/structure.sql index cfe07cac82b131..bc38b3f3b447b8 100644 --- a/db/structure.sql +++ b/db/structure.sql @@ -34,6 +34,28 @@ BEGIN END $$; +CREATE FUNCTION insert_namespaces_sync_event() RETURNS trigger + LANGUAGE plpgsql + AS $$ +BEGIN +INSERT INTO namespaces_sync_events (namespace_id) +VALUES(COALESCE(NEW.id, OLD.id)); +RETURN NULL; + +END +$$; + +CREATE FUNCTION insert_projects_sync_event() RETURNS trigger + LANGUAGE plpgsql + AS $$ +BEGIN +INSERT INTO projects_sync_events (project_id) +VALUES(COALESCE(NEW.id, OLD.id)); +RETURN NULL; + +END +$$; + CREATE FUNCTION integrations_set_type_new() RETURNS trigger LANGUAGE plpgsql AS $$ @@ -16473,6 +16495,20 @@ CREATE SEQUENCE namespaces_id_seq ALTER SEQUENCE namespaces_id_seq OWNED BY namespaces.id; +CREATE TABLE namespaces_sync_events ( + id bigint NOT NULL, + namespace_id bigint NOT NULL +); + +CREATE SEQUENCE namespaces_sync_events_id_seq + START WITH 1 + INCREMENT BY 1 + NO MINVALUE + NO MAXVALUE + CACHE 1; + +ALTER SEQUENCE namespaces_sync_events_id_seq OWNED BY namespaces_sync_events.id; + CREATE TABLE note_diff_files ( id integer NOT NULL, diff_note_id integer NOT NULL, @@ -18527,6 +18563,20 @@ CREATE SEQUENCE projects_id_seq ALTER SEQUENCE projects_id_seq OWNED BY projects.id; +CREATE TABLE projects_sync_events ( + id bigint NOT NULL, + project_id bigint NOT NULL +); + +CREATE SEQUENCE projects_sync_events_id_seq + START WITH 1 + INCREMENT BY 1 + NO MINVALUE + NO MAXVALUE + CACHE 1; + +ALTER SEQUENCE projects_sync_events_id_seq OWNED BY projects_sync_events.id; + CREATE TABLE prometheus_alert_events ( id bigint NOT NULL, project_id integer NOT NULL, @@ -21788,6 +21838,8 @@ ALTER TABLE ONLY namespace_statistics ALTER COLUMN id SET DEFAULT nextval('names ALTER TABLE ONLY namespaces ALTER COLUMN id SET DEFAULT nextval('namespaces_id_seq'::regclass); +ALTER TABLE ONLY namespaces_sync_events ALTER COLUMN id SET DEFAULT nextval('namespaces_sync_events_id_seq'::regclass); + ALTER TABLE ONLY note_diff_files ALTER COLUMN id SET DEFAULT nextval('note_diff_files_id_seq'::regclass); ALTER TABLE ONLY notes ALTER COLUMN id SET DEFAULT nextval('notes_id_seq'::regclass); @@ -21938,6 +21990,8 @@ ALTER TABLE ONLY project_tracing_settings ALTER COLUMN id SET DEFAULT nextval('p ALTER TABLE ONLY projects ALTER COLUMN id SET DEFAULT nextval('projects_id_seq'::regclass); +ALTER TABLE ONLY projects_sync_events ALTER COLUMN id SET DEFAULT nextval('projects_sync_events_id_seq'::regclass); + ALTER TABLE ONLY prometheus_alert_events ALTER COLUMN id SET DEFAULT nextval('prometheus_alert_events_id_seq'::regclass); ALTER TABLE ONLY prometheus_alerts ALTER COLUMN id SET DEFAULT nextval('prometheus_alerts_id_seq'::regclass); @@ -23556,6 +23610,9 @@ ALTER TABLE ONLY namespace_statistics ALTER TABLE ONLY namespaces ADD CONSTRAINT namespaces_pkey PRIMARY KEY (id); +ALTER TABLE ONLY namespaces_sync_events + ADD CONSTRAINT namespaces_sync_events_pkey PRIMARY KEY (id); + ALTER TABLE ONLY note_diff_files ADD CONSTRAINT note_diff_files_pkey PRIMARY KEY (id); @@ -23826,6 +23883,9 @@ ALTER TABLE ONLY project_tracing_settings ALTER TABLE ONLY projects ADD CONSTRAINT projects_pkey PRIMARY KEY (id); +ALTER TABLE ONLY projects_sync_events + ADD CONSTRAINT projects_sync_events_pkey PRIMARY KEY (id); + ALTER TABLE ONLY prometheus_alert_events ADD CONSTRAINT prometheus_alert_events_pkey PRIMARY KEY (id); @@ -26712,6 +26772,8 @@ CREATE INDEX index_namespaces_on_type_and_id ON namespaces USING btree (type, id CREATE INDEX index_namespaces_public_groups_name_id ON namespaces USING btree (name, id) WHERE (((type)::text = 'Group'::text) AND (visibility_level = 20)); +CREATE INDEX index_namespaces_sync_events_on_namespace_id ON namespaces_sync_events USING btree (namespace_id); + CREATE INDEX index_non_requested_project_members_on_source_id_and_type ON members USING btree (source_id, source_type) WHERE ((requested_at IS NULL) AND ((type)::text = 'ProjectMember'::text)); CREATE UNIQUE INDEX index_note_diff_files_on_diff_note_id ON note_diff_files USING btree (diff_note_id); @@ -27170,6 +27232,8 @@ CREATE INDEX index_projects_on_star_count ON projects USING btree (star_count); CREATE INDEX index_projects_on_updated_at_and_id ON projects USING btree (updated_at, id); +CREATE INDEX index_projects_sync_events_on_project_id ON projects_sync_events USING btree (project_id); + CREATE UNIQUE INDEX index_prometheus_alert_event_scoped_payload_key ON prometheus_alert_events USING btree (prometheus_alert_id, payload_key); CREATE INDEX index_prometheus_alert_events_on_project_id_and_status ON prometheus_alert_events USING btree (project_id, status); @@ -28924,6 +28988,14 @@ CREATE TRIGGER trigger_has_external_wiki_on_type_new_updated AFTER UPDATE OF typ CREATE TRIGGER trigger_has_external_wiki_on_update AFTER UPDATE ON integrations FOR EACH ROW WHEN (((new.type_new = 'Integrations::ExternalWiki'::text) AND (old.active <> new.active) AND (new.project_id IS NOT NULL))) EXECUTE FUNCTION set_has_external_wiki(); +CREATE TRIGGER trigger_namespaces_parent_id_on_insert AFTER INSERT ON namespaces FOR EACH ROW EXECUTE FUNCTION insert_namespaces_sync_event(); + +CREATE TRIGGER trigger_namespaces_parent_id_on_update AFTER UPDATE ON namespaces FOR EACH ROW WHEN ((old.parent_id IS DISTINCT FROM new.parent_id)) EXECUTE FUNCTION insert_namespaces_sync_event(); + +CREATE TRIGGER trigger_projects_parent_id_on_insert AFTER INSERT ON projects FOR EACH ROW EXECUTE FUNCTION insert_projects_sync_event(); + +CREATE TRIGGER trigger_projects_parent_id_on_update AFTER UPDATE ON projects FOR EACH ROW WHEN ((old.namespace_id IS DISTINCT FROM new.namespace_id)) EXECUTE FUNCTION insert_projects_sync_event(); + CREATE TRIGGER trigger_type_new_on_insert AFTER INSERT ON integrations FOR EACH ROW EXECUTE FUNCTION integrations_set_type_new(); ALTER TABLE ONLY chat_names @@ -30828,6 +30900,9 @@ ALTER TABLE ONLY gpg_keys ALTER TABLE ONLY analytics_language_trend_repository_languages ADD CONSTRAINT fk_rails_9d851d566c FOREIGN KEY (programming_language_id) REFERENCES programming_languages(id) ON DELETE CASCADE; +ALTER TABLE ONLY namespaces_sync_events + ADD CONSTRAINT fk_rails_9da32a0431 FOREIGN KEY (namespace_id) REFERENCES namespaces(id) ON DELETE CASCADE; + ALTER TABLE ONLY badges ADD CONSTRAINT fk_rails_9df4a56538 FOREIGN KEY (group_id) REFERENCES namespaces(id) ON DELETE CASCADE; @@ -31002,6 +31077,9 @@ ALTER TABLE ONLY security_findings ALTER TABLE ONLY packages_debian_project_component_files ADD CONSTRAINT fk_rails_bbe9ebfbd9 FOREIGN KEY (component_id) REFERENCES packages_debian_project_components(id) ON DELETE RESTRICT; +ALTER TABLE ONLY projects_sync_events + ADD CONSTRAINT fk_rails_bbf0eef59f FOREIGN KEY (project_id) REFERENCES projects(id) ON DELETE CASCADE; + ALTER TABLE ONLY approval_merge_request_rules_users ADD CONSTRAINT fk_rails_bc8972fa55 FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE; diff --git a/lib/gitlab/database/gitlab_schemas.yml b/lib/gitlab/database/gitlab_schemas.yml index 4a7db516b70f90..e0848f34930561 100644 --- a/lib/gitlab/database/gitlab_schemas.yml +++ b/lib/gitlab/database/gitlab_schemas.yml @@ -319,6 +319,7 @@ namespace_package_settings: :gitlab_main namespace_root_storage_statistics: :gitlab_main namespace_settings: :gitlab_main namespaces: :gitlab_main +namespaces_sync_events: :gitlab_main namespace_statistics: :gitlab_main note_diff_files: :gitlab_main notes: :gitlab_main @@ -413,6 +414,7 @@ project_repository_storage_moves: :gitlab_main project_security_settings: :gitlab_main project_settings: :gitlab_main projects: :gitlab_main +projects_sync_events: :gitlab_main project_statistics: :gitlab_main project_topics: :gitlab_main project_tracing_settings: :gitlab_main diff --git a/lib/gitlab/database/schema_helpers.rb b/lib/gitlab/database/schema_helpers.rb index 3d929c62933ff6..9ddc5391689b08 100644 --- a/lib/gitlab/database/schema_helpers.rb +++ b/lib/gitlab/database/schema_helpers.rb @@ -25,6 +25,7 @@ def create_trigger(table_name, name, function_name, fires:) CREATE TRIGGER #{name} #{fires} ON #{table_name} FOR EACH ROW + #{yield if block_given?} EXECUTE FUNCTION #{function_name}() SQL end diff --git a/spec/models/ci/namespace_mirror_spec.rb b/spec/models/ci/namespace_mirror_spec.rb new file mode 100644 index 00000000000000..24620b70d3f619 --- /dev/null +++ b/spec/models/ci/namespace_mirror_spec.rb @@ -0,0 +1,94 @@ +# frozen_string_literal: true + +require 'spec_helper' + +RSpec.describe Ci::NamespaceMirror do + let!(:group1) { create(:group) } + let!(:group2) { create(:group, parent: group1) } + let!(:group3) { create(:group, parent: group2) } + let!(:group4) { create(:group, parent: group3) } + + describe '.sync!' do + let!(:event) { Namespaces::SyncEvent.create!(namespace: namespace) } + + subject(:sync) { described_class.sync!(event.reload) } + + context 'when namespace hierarchy does not exist in the first place' do + let(:namespace) { group3 } + + it 'creates the hierarchy' do + expect { sync }.to change { described_class.count }.from(0).to(1) + + expect(namespace.ci_namespace_mirror).to have_attributes(traversal_ids: [group1.id, group2.id, group3.id]) + end + end + + context 'when namespace hierarchy does already exist' do + let(:namespace) { group3 } + + before do + described_class.create!(namespace: namespace, traversal_ids: [namespace.id]) + end + + it 'updates the hierarchy' do + expect { sync }.not_to change { described_class.count } + + expect(namespace.ci_namespace_mirror).to have_attributes(traversal_ids: [group1.id, group2.id, group3.id]) + end + end + + # I did not extract this context to a `shared_context` because the behavior will change + # after implementing the TODO in `Ci::NamespaceMirror.sync!` + context 'changing the middle namespace' do + let(:namespace) { group2 } + + before do + described_class.create!(namespace_id: group1.id, traversal_ids: [group1.id]) + described_class.create!(namespace_id: group2.id, traversal_ids: [group1.id, group2.id]) + described_class.create!(namespace_id: group3.id, traversal_ids: [group1.id, group2.id, group3.id]) + described_class.create!(namespace_id: group4.id, traversal_ids: [group1.id, group2.id, group3.id, group4.id]) + + group2.update!(parent: nil) + end + + it 'updates hierarchies for the base but wait for events for the children' do + expect { sync }.not_to change { described_class.count } + + expect(group1.reload.ci_namespace_mirror).to have_attributes(traversal_ids: [group1.id]) + expect(group2.reload.ci_namespace_mirror).to have_attributes(traversal_ids: [group2.id]) + expect(group3.reload.ci_namespace_mirror).to have_attributes(traversal_ids: [group2.id, group3.id]) + expect(group4.reload.ci_namespace_mirror).to have_attributes(traversal_ids: [group2.id, group3.id, group4.id]) + end + end + + context 'when the FFs sync_traversal_ids, use_traversal_ids and use_traversal_ids_for_ancestors are disabled' do + before do + stub_feature_flags(sync_traversal_ids: false, + use_traversal_ids: false, + use_traversal_ids_for_ancestors: false) + end + + context 'changing the middle namespace' do + let(:namespace) { group2 } + + before do + described_class.create!(namespace_id: group1.id, traversal_ids: [group1.id]) + described_class.create!(namespace_id: group2.id, traversal_ids: [group1.id, group2.id]) + described_class.create!(namespace_id: group3.id, traversal_ids: [group1.id, group2.id, group3.id]) + described_class.create!(namespace_id: group4.id, traversal_ids: [group1.id, group2.id, group3.id, group4.id]) + + group2.update!(parent: nil) + end + + it 'updates hierarchies for the base and descendants' do + expect { sync }.not_to change { described_class.count } + + expect(group1.reload.ci_namespace_mirror).to have_attributes(traversal_ids: [group1.id]) + expect(group2.reload.ci_namespace_mirror).to have_attributes(traversal_ids: [group2.id]) + expect(group3.reload.ci_namespace_mirror).to have_attributes(traversal_ids: [group2.id, group3.id]) + expect(group4.reload.ci_namespace_mirror).to have_attributes(traversal_ids: [group2.id, group3.id, group4.id]) + end + end + end + end +end diff --git a/spec/models/ci/project_mirror_spec.rb b/spec/models/ci/project_mirror_spec.rb new file mode 100644 index 00000000000000..199285b036c3ec --- /dev/null +++ b/spec/models/ci/project_mirror_spec.rb @@ -0,0 +1,36 @@ +# frozen_string_literal: true + +require 'spec_helper' + +RSpec.describe Ci::ProjectMirror do + let_it_be(:group1) { create(:group) } + let_it_be(:group2) { create(:group) } + + let!(:project) { create(:project, namespace: group2) } + + describe '.sync!' do + let!(:event) { Projects::SyncEvent.create!(project: project) } + + subject(:sync) { described_class.sync!(event.reload) } + + context 'when project hierarchy does not exist in the first place' do + it 'creates a ci_projects record' do + expect { sync }.to change { described_class.count }.from(0).to(1) + + expect(project.ci_project_mirror).to have_attributes(namespace_id: group2.id) + end + end + + context 'when project hierarchy does already exist' do + before do + described_class.create!(project_id: project.id, namespace_id: group1.id) + end + + it 'updates the related ci_projects record' do + expect { sync }.not_to change { described_class.count } + + expect(project.ci_project_mirror).to have_attributes(namespace_id: group2.id) + end + end + end +end diff --git a/spec/models/namespace_spec.rb b/spec/models/namespace_spec.rb index 0130dd7da4b309..54327fc70d9229 100644 --- a/spec/models/namespace_spec.rb +++ b/spec/models/namespace_spec.rb @@ -2059,4 +2059,75 @@ def project_rugged(project) it_behaves_like 'it has loose foreign keys' do let(:factory_name) { :group } end + + context 'Namespaces::SyncEvent' do + let!(:namespace) { create(:group) } + + let_it_be(:new_namespace1) { create(:group) } + let_it_be(:new_namespace2) { create(:group) } + + context 'when creating the namespace' do + it 'creates a namespaces_sync_event record' do + expect(namespace.sync_events.count).to eq(1) + end + + it 'enqueues ProcessSyncEventsWorker' do + expect(Namespaces::ProcessSyncEventsWorker).to receive(:perform_async) + + create(:namespace) + end + end + + context 'when updating namespace parent_id' do + it 'creates a namespaces_sync_event record' do + expect do + namespace.update!(parent_id: new_namespace1.id) + end.to change(Namespaces::SyncEvent, :count).by(1) + + expect(namespace.sync_events.count).to eq(2) + end + + it 'enqueues ProcessSyncEventsWorker' do + expect(Namespaces::ProcessSyncEventsWorker).to receive(:perform_async) + + namespace.update!(parent_id: new_namespace1.id) + end + end + + context 'when updating namespace other attribute' do + it 'creates a namespaces_sync_event record' do + expect do + namespace.update!(name: 'hello') + end.not_to change(Namespaces::SyncEvent, :count) + end + end + + context 'in the same transaction' do + context 'when updating different parent_id' do + it 'creates two namespaces_sync_event records' do + expect do + Namespace.transaction do + namespace.update!(parent_id: new_namespace1.id) + namespace.update!(parent_id: new_namespace2.id) + end + end.to change(Namespaces::SyncEvent, :count).by(2) + + expect(namespace.sync_events.count).to eq(3) + end + end + + context 'when updating the same parent_id' do + it 'creates one namespaces_sync_event record' do + expect do + Namespace.transaction do + namespace.update!(parent_id: new_namespace1.id) + namespace.update!(parent_id: new_namespace1.id) + end + end.to change(Namespaces::SyncEvent, :count).by(1) + + expect(namespace.sync_events.count).to eq(2) + end + end + end + end end diff --git a/spec/models/project_spec.rb b/spec/models/project_spec.rb index 20fec9c21a2962..5d6a2df147cd82 100644 --- a/spec/models/project_spec.rb +++ b/spec/models/project_spec.rb @@ -7457,6 +7457,77 @@ def has_external_wiki let(:factory_name) { :project } end + context 'Projects::SyncEvent' do + let!(:project) { create(:project) } + + let_it_be(:new_namespace1) { create(:namespace) } + let_it_be(:new_namespace2) { create(:namespace) } + + context 'when creating the project' do + it 'creates a projects_sync_event record' do + expect(project.sync_events.count).to eq(1) + end + + it 'enqueues ProcessProjectSyncEventsWorker' do + expect(Projects::ProcessSyncEventsWorker).to receive(:perform_async) + + create(:project) + end + end + + context 'when updating project namespace_id' do + it 'creates a projects_sync_event record' do + expect do + project.update!(namespace_id: new_namespace1.id) + end.to change(Projects::SyncEvent, :count).by(1) + + expect(project.sync_events.count).to eq(2) + end + + it 'enqueues ProcessProjectSyncEventsWorker' do + expect(Projects::ProcessSyncEventsWorker).to receive(:perform_async) + + project.update!(namespace_id: new_namespace1.id) + end + end + + context 'when updating project other attribute' do + it 'creates a projects_sync_event record' do + expect do + project.update!(name: 'hello') + end.not_to change(Projects::SyncEvent, :count) + end + end + + context 'in the same transaction' do + context 'when updating different namespace_id' do + it 'creates two projects_sync_event records' do + expect do + Project.transaction do + project.update!(namespace_id: new_namespace1.id) + project.update!(namespace_id: new_namespace2.id) + end + end.to change(Projects::SyncEvent, :count).by(2) + + expect(project.sync_events.count).to eq(3) + end + end + + context 'when updating the same namespace_id' do + it 'creates one projects_sync_event record' do + expect do + Project.transaction do + project.update!(namespace_id: new_namespace1.id) + project.update!(namespace_id: new_namespace1.id) + end + end.to change(Projects::SyncEvent, :count).by(1) + + expect(project.sync_events.count).to eq(2) + end + end + end + end + private def finish_job(export_job) diff --git a/spec/models/user_spec.rb b/spec/models/user_spec.rb index 3813fcd18b1ad8..d1d080ea113362 100644 --- a/spec/models/user_spec.rb +++ b/spec/models/user_spec.rb @@ -1540,7 +1540,7 @@ allow(user).to receive(:update_highest_role) end - expect(SecureRandom).to receive(:hex).and_return('3b8ca303') + allow(SecureRandom).to receive(:hex).and_return('3b8ca303') user = create(:user) diff --git a/spec/services/ci/process_sync_events_service_spec.rb b/spec/services/ci/process_sync_events_service_spec.rb new file mode 100644 index 00000000000000..00b670ff54f28b --- /dev/null +++ b/spec/services/ci/process_sync_events_service_spec.rb @@ -0,0 +1,129 @@ +# frozen_string_literal: true + +require 'spec_helper' + +RSpec.describe Ci::ProcessSyncEventsService do + let!(:group) { create(:group) } + let!(:project1) { create(:project, group: group) } + let!(:project2) { create(:project, group: group) } + let!(:parent_group_1) { create(:group) } + let!(:parent_group_2) { create(:group) } + + subject(:service) { described_class.new(sync_event_class, hierarchy_class) } + + describe '#perform' do + subject(:execute) { service.execute } + + context 'for Projects::SyncEvent' do + let(:sync_event_class) { Projects::SyncEvent } + let(:hierarchy_class) { ::Ci::ProjectMirror } + + before do + Projects::SyncEvent.delete_all + + project1.update!(group: parent_group_1) + project2.update!(group: parent_group_2) + end + + it 'consumes events' do + expect { execute }.to change(Projects::SyncEvent, :count).from(2).to(0) + + expect(project1.ci_project_mirror).to have_attributes( + namespace_id: parent_group_1.id + ) + expect(project2.ci_project_mirror).to have_attributes( + namespace_id: parent_group_2.id + ) + end + + it 'enqueues Projects::ProcessSyncEventsWorker if any left' do + stub_const("#{described_class}::BATCH_SIZE", 1) + + expect(Projects::ProcessSyncEventsWorker).to receive(:perform_async) + + execute + end + + it 'does not enqueue Projects::ProcessSyncEventsWorker if no left' do + stub_const("#{described_class}::BATCH_SIZE", 2) + + expect(Projects::ProcessSyncEventsWorker).not_to receive(:perform_async) + + execute + end + + context 'when there is no event' do + before do + Projects::SyncEvent.delete_all + end + + it 'does nothing' do + expect { execute }.not_to change(Projects::SyncEvent, :count) + end + end + + context 'when the FF ci_namespace_project_mirrors is disabled' do + before do + stub_feature_flags(ci_namespace_project_mirrors: false) + end + + it 'does nothing' do + expect { execute }.not_to change(Projects::SyncEvent, :count) + end + end + end + + context 'for Namespaces::SyncEvent' do + let(:sync_event_class) { Namespaces::SyncEvent } + let(:hierarchy_class) { ::Ci::NamespaceMirror } + + before do + Namespaces::SyncEvent.delete_all + + group.update!(parent: parent_group_2) + parent_group_2.update!(parent: parent_group_1) + end + + shared_examples 'event consuming' do + it 'consumes events' do + expect { execute }.to change(Namespaces::SyncEvent, :count).from(2).to(0) + + expect(group.ci_namespace_mirror).to have_attributes( + traversal_ids: [parent_group_1.id, parent_group_2.id, group.id] + ) + expect(parent_group_2.ci_namespace_mirror).to have_attributes( + traversal_ids: [parent_group_1.id, parent_group_2.id] + ) + end + end + + context 'when the FFs sync_traversal_ids, use_traversal_ids and use_traversal_ids_for_ancestors are disabled' do + before do + stub_feature_flags(sync_traversal_ids: false, + use_traversal_ids: false, + use_traversal_ids_for_ancestors: false) + end + + it_behaves_like 'event consuming' + end + + it_behaves_like 'event consuming' + + it 'enqueues Namespaces::ProcessSyncEventsWorker if any left' do + stub_const("#{described_class}::BATCH_SIZE", 1) + + expect(Namespaces::ProcessSyncEventsWorker).to receive(:perform_async) + + execute + end + + it 'does not enqueue Namespaces::ProcessSyncEventsWorker if no left' do + stub_const("#{described_class}::BATCH_SIZE", 2) + + expect(Namespaces::ProcessSyncEventsWorker).not_to receive(:perform_async) + + execute + end + end + end +end diff --git a/spec/workers/namespaces/process_sync_events_worker_spec.rb b/spec/workers/namespaces/process_sync_events_worker_spec.rb new file mode 100644 index 00000000000000..59be1fffdb44be --- /dev/null +++ b/spec/workers/namespaces/process_sync_events_worker_spec.rb @@ -0,0 +1,32 @@ +# frozen_string_literal: true + +require 'spec_helper' + +RSpec.describe Namespaces::ProcessSyncEventsWorker do + let!(:group1) { create(:group) } + let!(:group2) { create(:group) } + let!(:group3) { create(:group) } + + include_examples 'an idempotent worker' + + describe '#perform' do + subject(:perform) { described_class.new.perform } + + before do + group2.update!(parent: group1) + group3.update!(parent: group2) + end + + it 'consumes all sync events' do + expect { perform }.to change(Namespaces::SyncEvent, :count).from(5).to(0) + end + + it 'syncs namespace hierarchy traversal ids' do + expect { perform }.to change(Ci::NamespaceMirror, :all).to contain_exactly( + an_object_having_attributes(namespace_id: group1.id, traversal_ids: [group1.id]), + an_object_having_attributes(namespace_id: group2.id, traversal_ids: [group1.id, group2.id]), + an_object_having_attributes(namespace_id: group3.id, traversal_ids: [group1.id, group2.id, group3.id]) + ) + end + end +end diff --git a/spec/workers/projects/process_sync_events_worker_spec.rb b/spec/workers/projects/process_sync_events_worker_spec.rb new file mode 100644 index 00000000000000..600fbbc6b20631 --- /dev/null +++ b/spec/workers/projects/process_sync_events_worker_spec.rb @@ -0,0 +1,28 @@ +# frozen_string_literal: true + +require 'spec_helper' + +RSpec.describe Projects::ProcessSyncEventsWorker do + let!(:group) { create(:group) } + let!(:project) { create(:project) } + + include_examples 'an idempotent worker' + + describe '#perform' do + subject(:perform) { described_class.new.perform } + + before do + project.update!(namespace: group) + end + + it 'consumes all sync events' do + expect { perform }.to change(Projects::SyncEvent, :count).from(2).to(0) + end + + it 'syncs project namespace id' do + expect { perform }.to change(Ci::ProjectMirror, :all).to contain_exactly( + an_object_having_attributes(namespace_id: group.id) + ) + end + end +end -- GitLab From ea6e0b81144403aff8c1d5992d426613ef22623e Mon Sep 17 00:00:00 2001 From: Furkan Ayhan Date: Tue, 7 Dec 2021 00:29:53 +0300 Subject: [PATCH 2/4] Apply reviewer feedbacks --- app/models/ci/namespace_mirror.rb | 10 +++++----- app/models/namespace.rb | 1 + app/models/project.rb | 1 + app/services/ci/process_sync_events_service.rb | 4 ++-- spec/lib/gitlab/import_export/all_models.yml | 2 ++ spec/models/ci/namespace_mirror_spec.rb | 2 +- spec/models/user_spec.rb | 4 +++- 7 files changed, 15 insertions(+), 9 deletions(-) diff --git a/app/models/ci/namespace_mirror.rb b/app/models/ci/namespace_mirror.rb index 656b9ea8fe4e29..81b0554573062f 100644 --- a/app/models/ci/namespace_mirror.rb +++ b/app/models/ci/namespace_mirror.rb @@ -6,9 +6,9 @@ module Ci class NamespaceMirror < ApplicationRecord belongs_to :namespace - scope :contains_namespaces, -> (ids) { - where('traversal_ids @> ARRAY[?]::int[]', ids.join(',')) - } + scope :contains_namespace, -> (id) do + where('traversal_ids @> ARRAY[?]::int[]', id) + end class << self def sync!(event) @@ -25,10 +25,10 @@ def sync!(event) private def sync_children_namespaces!(namespace_id, traversal_ids) - contains_namespaces([namespace_id]) + contains_namespace(namespace_id) .where.not(namespace_id: namespace_id) .update_all( - "traversal_ids = ARRAY[#{traversal_ids.join(',')}]::int[] || traversal_ids[array_position(traversal_ids, #{namespace_id}) + 1:]" + "traversal_ids = ARRAY[#{sanitize_sql(traversal_ids.join(','))}]::int[] || traversal_ids[array_position(traversal_ids, #{sanitize_sql(namespace_id)}) + 1:]" ) end end diff --git a/app/models/namespace.rb b/app/models/namespace.rb index 51d18b420e6422..9fe22a640f0fab 100644 --- a/app/models/namespace.rb +++ b/app/models/namespace.rb @@ -619,6 +619,7 @@ def enforce_minimum_path_length? path_changed? && !project_namespace? end + # SyncEvents are created by PG triggers (with the function `insert_namespaces_sync_event`) def schedule_sync_event_worker run_after_commit do Namespaces::SyncEvent.enqueue_worker diff --git a/app/models/project.rb b/app/models/project.rb index a6cbf478e293ba..026cf1e2e89711 100644 --- a/app/models/project.rb +++ b/app/models/project.rb @@ -2938,6 +2938,7 @@ def sync_attributes(project_namespace) project_namespace.visibility_level = visibility_level end + # SyncEvents are created by PG triggers (with the function `insert_projects_sync_event`) def schedule_sync_event_worker run_after_commit do Projects::SyncEvent.enqueue_worker diff --git a/app/services/ci/process_sync_events_service.rb b/app/services/ci/process_sync_events_service.rb index f36d849a3f63ea..d55f4e2b88650c 100644 --- a/app/services/ci/process_sync_events_service.rb +++ b/app/services/ci/process_sync_events_service.rb @@ -29,8 +29,8 @@ def process_events return if events.empty? - min = events[0] - max = events[-1] + min = events.first + max = events.last events.each { |event| @sync_class.sync!(event) } @sync_event_class.for_id(min.id..max.id).delete_all diff --git a/spec/lib/gitlab/import_export/all_models.yml b/spec/lib/gitlab/import_export/all_models.yml index 49052623436c75..cabc7cff8a177e 100644 --- a/spec/lib/gitlab/import_export/all_models.yml +++ b/spec/lib/gitlab/import_export/all_models.yml @@ -597,6 +597,8 @@ project: - security_scans - ci_feature_usages - bulk_import_exports +- ci_project_mirror +- sync_events award_emoji: - awardable - user diff --git a/spec/models/ci/namespace_mirror_spec.rb b/spec/models/ci/namespace_mirror_spec.rb index 24620b70d3f619..b4c71f51377f1d 100644 --- a/spec/models/ci/namespace_mirror_spec.rb +++ b/spec/models/ci/namespace_mirror_spec.rb @@ -9,7 +9,7 @@ let!(:group4) { create(:group, parent: group3) } describe '.sync!' do - let!(:event) { Namespaces::SyncEvent.create!(namespace: namespace) } + let!(:event) { namespace.sync_events.create! } subject(:sync) { described_class.sync!(event.reload) } diff --git a/spec/models/user_spec.rb b/spec/models/user_spec.rb index d1d080ea113362..04c9f755b0130a 100644 --- a/spec/models/user_spec.rb +++ b/spec/models/user_spec.rb @@ -1540,7 +1540,9 @@ allow(user).to receive(:update_highest_role) end - allow(SecureRandom).to receive(:hex).and_return('3b8ca303') + # Namespace#schedule_sync_event_worker => Sidekiq calls `SecureRandom.hex(12)` to generate `jid` + expect(SecureRandom).to receive(:hex).with(12).and_call_original + expect(SecureRandom).to receive(:hex).with(no_args).and_return('3b8ca303') user = create(:user) -- GitLab From 13036f0af473d3df5ac0de94b1501160e43fe814 Mon Sep 17 00:00:00 2001 From: Furkan Ayhan Date: Tue, 7 Dec 2021 15:54:49 +0300 Subject: [PATCH 3/4] Use id_in instead of for_id --- app/models/namespaces/sync_event.rb | 1 - app/models/projects/sync_event.rb | 1 - app/services/ci/process_sync_events_service.rb | 2 +- 3 files changed, 1 insertion(+), 3 deletions(-) diff --git a/app/models/namespaces/sync_event.rb b/app/models/namespaces/sync_event.rb index e1aa3a1788306b..8534d8afb8c84e 100644 --- a/app/models/namespaces/sync_event.rb +++ b/app/models/namespaces/sync_event.rb @@ -9,7 +9,6 @@ class Namespaces::SyncEvent < ApplicationRecord scope :preload_synced_relation, -> { preload(:namespace) } scope :order_by_id_asc, -> { order(id: :asc) } - scope :for_id, -> (id) { where(id: id) } def self.enqueue_worker ::Namespaces::ProcessSyncEventsWorker.perform_async # rubocop:disable CodeReuse/Worker diff --git a/app/models/projects/sync_event.rb b/app/models/projects/sync_event.rb index 46016733abb496..5221b00c55f072 100644 --- a/app/models/projects/sync_event.rb +++ b/app/models/projects/sync_event.rb @@ -9,7 +9,6 @@ class Projects::SyncEvent < ApplicationRecord scope :preload_synced_relation, -> { preload(:project) } scope :order_by_id_asc, -> { order(id: :asc) } - scope :for_id, -> (id) { where(id: id) } def self.enqueue_worker ::Projects::ProcessSyncEventsWorker.perform_async # rubocop:disable CodeReuse/Worker diff --git a/app/services/ci/process_sync_events_service.rb b/app/services/ci/process_sync_events_service.rb index d55f4e2b88650c..0c4eb1e03c707a 100644 --- a/app/services/ci/process_sync_events_service.rb +++ b/app/services/ci/process_sync_events_service.rb @@ -33,7 +33,7 @@ def process_events max = events.last events.each { |event| @sync_class.sync!(event) } - @sync_event_class.for_id(min.id..max.id).delete_all + @sync_event_class.id_in(min.id..max.id).delete_all end def enqueue_worker_if_there_still_event -- GitLab From f10222a67596881d9c0691022fce56fe4e1abc78 Mon Sep 17 00:00:00 2001 From: Furkan Ayhan Date: Thu, 9 Dec 2021 18:30:43 +0300 Subject: [PATCH 4/4] Apply review changes --- app/models/ci/namespace_mirror.rb | 7 ++-- .../ci/process_sync_events_service.rb | 35 +++++++++++++------ .../namespaces/process_sync_events_worker.rb | 5 ++- .../projects/process_sync_events_worker.rb | 5 ++- spec/models/user_spec.rb | 6 ++-- 5 files changed, 41 insertions(+), 17 deletions(-) diff --git a/app/models/ci/namespace_mirror.rb b/app/models/ci/namespace_mirror.rb index 81b0554573062f..8a4be3139e83da 100644 --- a/app/models/ci/namespace_mirror.rb +++ b/app/models/ci/namespace_mirror.rb @@ -12,13 +12,14 @@ class NamespaceMirror < ApplicationRecord class << self def sync!(event) - traversal_ids = event.namespace.self_and_ancestor_ids(hierarchy_order: :desc) + namespace = event.namespace + traversal_ids = namespace.self_and_ancestor_ids(hierarchy_order: :desc) upsert({ namespace_id: event.namespace_id, traversal_ids: traversal_ids }, unique_by: :namespace_id) - # TODO: after fully implemented `sync_traversal_ids` FF, we will not need this method. - # However, we also need to change the PG trigger to reflect `namespaces.traversal_ids` changes + # It won't be necessary once we remove `sync_traversal_ids`. + # More info: https://gitlab.com/gitlab-org/gitlab/-/issues/347541 sync_children_namespaces!(event.namespace_id, traversal_ids) end diff --git a/app/services/ci/process_sync_events_service.rb b/app/services/ci/process_sync_events_service.rb index 0c4eb1e03c707a..6be8c41dc6aaae 100644 --- a/app/services/ci/process_sync_events_service.rb +++ b/app/services/ci/process_sync_events_service.rb @@ -3,6 +3,7 @@ module Ci class ProcessSyncEventsService include Gitlab::Utils::StrongMemoize + include ExclusiveLeaseGuard BATCH_SIZE = 1000 @@ -14,30 +15,44 @@ def initialize(sync_event_class, sync_class) def execute return unless ::Feature.enabled?(:ci_namespace_project_mirrors, default_enabled: :yaml) - process_events + # preventing parallel processing over the same event table + try_obtain_lease { process_events } + enqueue_worker_if_there_still_event end private def process_events - events = @sync_event_class - .preload_synced_relation - .order_by_id_asc - .limit(BATCH_SIZE) - .to_a + events = @sync_event_class.preload_synced_relation.first(BATCH_SIZE) return if events.empty? - min = events.first - max = events.last + first = events.first + last_processed = nil + + begin + events.each do |event| + @sync_class.sync!(event) - events.each { |event| @sync_class.sync!(event) } - @sync_event_class.id_in(min.id..max.id).delete_all + last_processed = event + end + ensure + # remove events till the one that was last succesfully processed + @sync_event_class.id_in(first.id..last_processed.id).delete_all if last_processed + end end def enqueue_worker_if_there_still_event @sync_event_class.enqueue_worker if @sync_event_class.exists? end + + def lease_key + "#{super}::#{@sync_event_class}" + end + + def lease_timeout + 1.minute + end end end diff --git a/app/workers/namespaces/process_sync_events_worker.rb b/app/workers/namespaces/process_sync_events_worker.rb index 80985e13fa7cc3..f3c4f5bebb12ca 100644 --- a/app/workers/namespaces/process_sync_events_worker.rb +++ b/app/workers/namespaces/process_sync_events_worker.rb @@ -1,6 +1,9 @@ # frozen_string_literal: true module Namespaces + # This worker can be called multiple times at the same time but only one of them can + # process events at a time. This is ensured by `try_obtain_lease` in `Ci::ProcessSyncEventsService`. + # `until_executing` here is to reduce redundant worker enqueuing. class ProcessSyncEventsWorker include ApplicationWorker @@ -9,8 +12,8 @@ class ProcessSyncEventsWorker feature_category :sharding urgency :high - deduplicate :until_executed idempotent! + deduplicate :until_executing def perform ::Ci::ProcessSyncEventsService.new(::Namespaces::SyncEvent, ::Ci::NamespaceMirror).execute diff --git a/app/workers/projects/process_sync_events_worker.rb b/app/workers/projects/process_sync_events_worker.rb index 40242059550f3d..b7c4b4de3d01c7 100644 --- a/app/workers/projects/process_sync_events_worker.rb +++ b/app/workers/projects/process_sync_events_worker.rb @@ -1,6 +1,9 @@ # frozen_string_literal: true module Projects + # This worker can be called multiple times at the same time but only one of them can + # process events at a time. This is ensured by `try_obtain_lease` in `Ci::ProcessSyncEventsService`. + # `until_executing` here is to reduce redundant worker enqueuing. class ProcessSyncEventsWorker include ApplicationWorker @@ -9,8 +12,8 @@ class ProcessSyncEventsWorker feature_category :sharding urgency :high - deduplicate :until_executed idempotent! + deduplicate :until_executing def perform ::Ci::ProcessSyncEventsService.new(::Projects::SyncEvent, ::Ci::ProjectMirror).execute diff --git a/spec/models/user_spec.rb b/spec/models/user_spec.rb index 04c9f755b0130a..f44d417b1ea6e2 100644 --- a/spec/models/user_spec.rb +++ b/spec/models/user_spec.rb @@ -1540,8 +1540,10 @@ allow(user).to receive(:update_highest_role) end - # Namespace#schedule_sync_event_worker => Sidekiq calls `SecureRandom.hex(12)` to generate `jid` - expect(SecureRandom).to receive(:hex).with(12).and_call_original + allow_next_instance_of(Namespaces::UserNamespace) do |namespace| + allow(namespace).to receive(:schedule_sync_event_worker) + end + expect(SecureRandom).to receive(:hex).with(no_args).and_return('3b8ca303') user = create(:user) -- GitLab