diff --git a/app/models/merge_requests/merge_schedule.rb b/app/models/merge_requests/merge_schedule.rb index 5749768b5a10d8368d1c1df0634091013001a19e..f5ea97afa22aedc098ed2bc78687f918d4f9d0d8 100644 --- a/app/models/merge_requests/merge_schedule.rb +++ b/app/models/merge_requests/merge_schedule.rb @@ -2,6 +2,8 @@ module MergeRequests class MergeSchedule < ApplicationRecord + include FromUnion + self.table_name = 'merge_request_merge_schedules' belongs_to :merge_request, optional: false, inverse_of: :merge_schedule diff --git a/app/workers/all_queues.yml b/app/workers/all_queues.yml index f42a991b82b8c4ee20bc3ac6d3cdb97b3ed7992b..0c502cdb20d61ec847b81596895621fd23865b12 100644 --- a/app/workers/all_queues.yml +++ b/app/workers/all_queues.yml @@ -615,6 +615,15 @@ :weight: 1 :idempotent: false :tags: [] +- :name: cronjob:merge_requests_process_scheduled_merge + :worker_name: MergeRequests::ProcessScheduledMergeWorker + :feature_category: :code_review_workflow + :has_external_dependencies: false + :urgency: :low + :resource_boundary: :cpu + :weight: 1 + :idempotent: false + :tags: [] - :name: cronjob:metrics_global_metrics_update :worker_name: Metrics::GlobalMetricsUpdateWorker :feature_category: :observability diff --git a/app/workers/merge_requests/process_scheduled_merge_worker.rb b/app/workers/merge_requests/process_scheduled_merge_worker.rb new file mode 100644 index 0000000000000000000000000000000000000000..da2cd756a31827e248ca607f5bc62048f59284e0 --- /dev/null +++ b/app/workers/merge_requests/process_scheduled_merge_worker.rb @@ -0,0 +1,78 @@ +# frozen_string_literal: true + +module MergeRequests + class ProcessScheduledMergeWorker # rubocop:disable Scalability/IdempotentWorker -- time dependent queries can't be idempotent + include ApplicationWorker + + data_consistency :always # rubocop:disable SidekiqLoadBalancing/WorkerDataConsistency -- this is a cronjob + + include CronjobQueue + include ::Gitlab::ExclusiveLeaseHelpers + + LOCK_RETRY = 3 + LOCK_TTL = 5.minutes + DELAY = 7.seconds + BATCH_SIZE = 500 + + feature_category :code_review_workflow + worker_resource_boundary :cpu + + def perform + in_lock(lock_key, **lock_params) do + # rubocop:disable CodeReuse/ActiveRecord -- using keyset pagination with custom order for better performance + order = Gitlab::Pagination::Keyset::Order.build([ + Gitlab::Pagination::Keyset::ColumnOrderDefinition.new( + attribute_name: 'merge_after', + order_expression: MergeRequests::MergeSchedule.arel_table[:merge_after].asc, + nullable: :not_nullable + ), + Gitlab::Pagination::Keyset::ColumnOrderDefinition.new( + attribute_name: 'merge_request_id', + order_expression: MergeRequests::MergeSchedule.arel_table[:merge_request_id].asc, + nullable: :not_nullable + ) + ]) + + scope = MergeRequests::MergeSchedule + .where(merge_after: ..Time.zone.now) + .order(order) + .select(:merge_after, :merge_request_id) + + iterator = Gitlab::Pagination::Keyset::Iterator.new(scope: scope) + + iteration = 0 + iterator.each_batch(of: BATCH_SIZE) do |records| + loaded_records = MergeRequest + .with_auto_merge_enabled + .where(id: records.to_a.pluck(:merge_request_id)) + + enqueue_auto_merge_process_worker(loaded_records, iteration) + iteration += 1 + end + # rubocop:enable CodeReuse/ActiveRecord + end + end + + private + + def lock_key + self.class.name.underscore + end + + def lock_params + { + ttl: LOCK_TTL, + retries: LOCK_RETRY + } + end + + def enqueue_auto_merge_process_worker(merge_requests, index) + AutoMergeProcessWorker.bulk_perform_in_with_contexts( + [1, index * DELAY].max, + merge_requests, + arguments_proc: ->(merge_request) { [merge_request.id] }, + context_proc: ->(merge_request) { { project: merge_request.project, user: merge_request.merge_user } } + ) + end + end +end diff --git a/config/initializers/1_settings.rb b/config/initializers/1_settings.rb index 14903eb3077c90c24771ee32505c65c7699068aa..7b8bc050d58dd92d9d20ba04a7f06e8ffb7909ab 100644 --- a/config/initializers/1_settings.rb +++ b/config/initializers/1_settings.rb @@ -729,6 +729,9 @@ Settings.cron_jobs['database_monitor_locked_tables_cron_worker'] ||= {} Settings.cron_jobs['database_monitor_locked_tables_cron_worker']['cron'] ||= '30 7 */3 * *' Settings.cron_jobs['database_monitor_locked_tables_cron_worker']['job_class'] = 'Database::MonitorLockedTablesWorker' +Settings.cron_jobs['merge_requests_process_scheduled_merge'] ||= {} +Settings.cron_jobs['merge_requests_process_scheduled_merge']['cron'] ||= '*/1 * * * *' +Settings.cron_jobs['merge_requests_process_scheduled_merge']['job_class'] = 'MergeRequests::ProcessScheduledMergeWorker' Gitlab.ee do Settings.cron_jobs['analytics_devops_adoption_create_all_snapshots_worker'] ||= {} diff --git a/db/migrate/20241008193537_add_merge_after_index_to_merge_schedules.rb b/db/migrate/20241008193537_add_merge_after_index_to_merge_schedules.rb new file mode 100644 index 0000000000000000000000000000000000000000..7040b6242d3525083680eec53afab7897dbb4cbc --- /dev/null +++ b/db/migrate/20241008193537_add_merge_after_index_to_merge_schedules.rb @@ -0,0 +1,16 @@ +# frozen_string_literal: true + +class AddMergeAfterIndexToMergeSchedules < Gitlab::Database::Migration[2.2] + milestone '17.5' + disable_ddl_transaction! + + INDEX_NAME = 'index_merge_request_merge_schedules_on_merge_after_and_mr_id' + + def up + add_concurrent_index :merge_request_merge_schedules, %i[merge_after merge_request_id], name: INDEX_NAME + end + + def down + remove_concurrent_index_by_name :merge_request_merge_schedules, INDEX_NAME + end +end diff --git a/db/schema_migrations/20241008193537 b/db/schema_migrations/20241008193537 new file mode 100644 index 0000000000000000000000000000000000000000..0f45f5c2b23ca641a0a564d4d5f092afc74846bc --- /dev/null +++ b/db/schema_migrations/20241008193537 @@ -0,0 +1 @@ +fe3d36a6325b811509c93659dc36a7462f5b06a7a8c2be844aad7f53e8685f3f \ No newline at end of file diff --git a/db/structure.sql b/db/structure.sql index 2fb619b538bcfaa9ea4d158eda8f352dc8188927..fb69b3fcdd60671ac54d339ea805e377cbaa3945 100644 --- a/db/structure.sql +++ b/db/structure.sql @@ -29422,6 +29422,8 @@ CREATE INDEX index_merge_request_diffs_on_project_id ON merge_request_diffs USIN CREATE UNIQUE INDEX index_merge_request_diffs_on_unique_merge_request_id ON merge_request_diffs USING btree (merge_request_id) WHERE (diff_type = 2); +CREATE INDEX index_merge_request_merge_schedules_on_merge_after_and_mr_id ON merge_request_merge_schedules USING btree (merge_after, merge_request_id); + CREATE UNIQUE INDEX index_merge_request_merge_schedules_on_merge_request_id ON merge_request_merge_schedules USING btree (merge_request_id); CREATE INDEX index_merge_request_merge_schedules_on_project_id ON merge_request_merge_schedules USING btree (project_id); diff --git a/spec/workers/merge_requests/process_scheduled_merge_worker_spec.rb b/spec/workers/merge_requests/process_scheduled_merge_worker_spec.rb new file mode 100644 index 0000000000000000000000000000000000000000..f6100b72034c6fa2e22aa3d66fa15634ae45df1a --- /dev/null +++ b/spec/workers/merge_requests/process_scheduled_merge_worker_spec.rb @@ -0,0 +1,52 @@ +# frozen_string_literal: true + +require 'spec_helper' + +RSpec.describe MergeRequests::ProcessScheduledMergeWorker, :sidekiq_inline, feature_category: :code_review_workflow do + include ExclusiveLeaseHelpers + + subject(:perform) { described_class.new.perform } + + let_it_be(:user) { create(:user) } + + let!(:scheduled_mr) do + create(:merge_request, :unique_branches, author: user, merge_user: user, auto_merge_enabled: true).tap do |mr| + create(:merge_request_merge_schedule, merge_request: mr, merge_after: 1.minute.ago) + end + end + + context 'when max retry attempts reach' do + let!(:lease) { stub_exclusive_lease_taken(described_class.name.underscore) } + + it 'raises an error' do + expect(lease).to receive(:try_obtain).exactly(described_class::LOCK_RETRY + 1).times + expect { perform }.to raise_error(Gitlab::ExclusiveLeaseHelpers::FailedToObtainLockError) + end + end + + context 'with scheduling delay' do + before do + stub_const("#{described_class}::BATCH_SIZE", 1) + end + + let!(:other_scheduled_mr) do + create(:merge_request, :unique_branches, author: user, merge_user: user, auto_merge_enabled: true).tap do |mr| + create(:merge_request_merge_schedule, merge_request: mr, merge_after: 1.minute.ago) + end + end + + it 'schedules AutoMergeProcessWorker for each batch with increasing delay' do + expect(AutoMergeProcessWorker) + .to receive(:bulk_perform_in) + .with(1.second, [[scheduled_mr.id]]) + .and_call_original + + expect(AutoMergeProcessWorker) + .to receive(:bulk_perform_in) + .with(7.seconds, [[other_scheduled_mr.id]]) + .and_call_original + + perform + end + end +end