diff --git a/lib/gitlab/ci/partitioning/rebalancing.rb b/lib/gitlab/ci/partitioning/rebalancing.rb new file mode 100644 index 0000000000000000000000000000000000000000..af3b9e1baa68d4c3330b2f090626461e079237c0 --- /dev/null +++ b/lib/gitlab/ci/partitioning/rebalancing.rb @@ -0,0 +1,133 @@ +# frozen_string_literal: true +module Gitlab + module Ci + module Partitioning + class Rebalancing + include Gitlab::Database::DynamicModelHelpers + include Gitlab::Utils::StrongMemoize + + BATCH_SIZE = 100 + CI_BUILDS_ONE_TO_ONE_TABLES = %i[ + p_ci_builds_metadata + ci_pending_builds + ci_running_builds + ci_build_trace_metadata + ci_builds_runner_session + ].freeze + + attr_reader :to, :pipelines_range + + def initialize(to:, pipelines:) + @to = to + @pipelines_range = pipelines + end + + def perform + each_pipelines_batch do |pipelines| + ::Ci::Pipeline.transaction do + pipelines.update_all(partition_id: to) + process_stages(pipelines) + process_jobs(pipelines) + each_pipeline_variables_batch(pipelines) { |variables| variables.update_all(partition_id: to) } + each_sources_pipelines_batch(pipelines) { |sources| sources.update_all(partition_id: to) } + end + end + end + + private + + def process_stages(pipelines) + each_stages_batch(pipelines) do |stages| + stages.update_all(partition_id: to) + end + end + + # These tables are updated through FK constraints from ci_builds + # and don't need to be handled here: + # - ci_unit_test_failures + # - ci_build_pending_states + # - ci_build_trace_chunks + # + def process_jobs(pipelines) + each_jobs_batch(pipelines) do |jobs| + jobs.update_all(partition_id: to) + process_jobs_has_one_relations(jobs) + process_jobs_has_many_relations(jobs) + end + end + + def process_jobs_has_one_relations(jobs) + CI_BUILDS_ONE_TO_ONE_TABLES.each do |table_name| + batchable_model(table_name) + .where(build_id: jobs) # rubocop: disable CodeReuse/ActiveRecord + .update_all(partition_id: to) + end + end + + def process_jobs_has_many_relations(jobs) + each_build_needs_batch(jobs) { |needs| needs.update_all(partition_id: to) } + each_build_report_results_batch(jobs) { |reports| reports.update_all(partition_id: to) } + each_job_artifacts_batch(jobs) { |artifacts| artifacts.update_all(partition_id: to) } + each_job_variables_batch(jobs) { |variables| variables.update_all(partition_id: to) } + each_job_sources_pipelines_batch(jobs) { |sources| sources.update_all(source_partition_id: to) } + end + + def each_pipelines_batch(&block) + each_batchable_model(:ci_pipelines, { id: pipelines_range }, &block) + end + + def each_stages_batch(pipelines, &block) + each_batchable_model(:ci_stages, { pipeline_id: pipelines }, &block) + end + + def each_jobs_batch(pipelines, &block) + each_batchable_model(:ci_builds, { commit_id: pipelines }, &block) + end + + def each_build_needs_batch(jobs, &block) + each_batchable_model(:ci_build_needs, { build_id: jobs }, &block) + end + + def each_build_report_results_batch(jobs, &block) + each_batchable_model(:ci_build_report_results, { build_id: jobs }, &block) + end + + def each_job_artifacts_batch(jobs, &block) + each_batchable_model(:ci_job_artifacts, { job_id: jobs }, &block) + end + + def each_job_variables_batch(jobs, &block) + each_batchable_model(:ci_job_variables, { job_id: jobs }, &block) + end + + def each_pipeline_variables_batch(pipelines, &block) + each_batchable_model(:ci_pipeline_variables, { pipeline_id: pipelines }, &block) + end + + def each_sources_pipelines_batch(pipelines, &block) + each_batchable_model(:ci_sources_pipelines, { pipeline_id: pipelines }, &block) + end + + def each_job_sources_pipelines_batch(jobs, &block) + each_batchable_model(:ci_sources_pipelines, { source_job_id: jobs }, &block) + end + + def each_batchable_model(table_name, where_values, &block) + batchable_model(table_name) + .where(where_values) # rubocop: disable CodeReuse/ActiveRecord + .each_batch(of: BATCH_SIZE, &block) + end + + def batchable_model(table_name) + strong_memoize_with(table_name) do + define_batchable_model(table_name, connection: connection) + end + end + + def connection + ::Ci::ApplicationRecord.connection + end + end + end + end +end diff --git a/lib/tasks/ci/partitioning.rake b/lib/tasks/ci/partitioning.rake new file mode 100644 index 0000000000000000000000000000000000000000..71b5c60128ee5eec326fffab7003b4990df5538d --- /dev/null +++ b/lib/tasks/ci/partitioning.rake @@ -0,0 +1,15 @@ +# frozen_string_literal: true + +namespace :ci do + namespace :partitioning do + desc "GitLab | CI | Fix partition_id values for partitionable resources" + task :rebalance, [:partition_id, :min_pipeline_id, :max_pipeline_id] => :environment do |_t, args| + to = args.partition_id.to_i + pipelines = Range.new(args.min_pipeline_id.to_i, args.max_pipeline_id.to_i) + + ::Gitlab::Ci::Partitioning::Rebalancing + .new(to: to, pipelines: pipelines) + .perform + end + end +end diff --git a/spec/lib/gitlab/ci/partitioning/rebalancing_spec.rb b/spec/lib/gitlab/ci/partitioning/rebalancing_spec.rb new file mode 100644 index 0000000000000000000000000000000000000000..f331146f0e0b48d4457bd88e6aa3463a685cb0ea --- /dev/null +++ b/spec/lib/gitlab/ci/partitioning/rebalancing_spec.rb @@ -0,0 +1,42 @@ +# frozen_string_literal: true + +require 'spec_helper' + +RSpec.describe Gitlab::Ci::Partitioning::Rebalancing, :ci_partitioning do + subject do + described_class.new( + to: ci_testing_partition_id, + pipelines: pipelines_range + ).perform + end + + let_it_be(:pipeline) { create(:ci_pipeline) } + let_it_be(:stage) { create(:ci_stage, pipeline: pipeline) } + let_it_be(:build) { create(:ci_build, pipeline: pipeline, ci_stage: stage) } + let_it_be(:bridge) { create(:ci_bridge, pipeline: pipeline, ci_stage: stage) } + let_it_be(:build_need) { create(:ci_build_need, build: build) } + let_it_be(:bridge_need) { create(:ci_build_need, build: bridge) } + + let(:pipelines_range) { Range.new(pipeline.id, pipeline.id) } + + it do + expect(pipeline.partition_id).to eq(100) + expect(stage.partition_id).to eq(100) + expect(build.partition_id).to eq(100) + expect(bridge.partition_id).to eq(100) + expect(build.metadata.partition_id).to eq(100) + expect(bridge.metadata.partition_id).to eq(100) + expect(build_need.partition_id).to eq(100) + expect(bridge_need.partition_id).to eq(100) + + subject + + expect(pipeline.reload.partition_id).to eq(ci_testing_partition_id) + expect(stage.reload.partition_id).to eq(ci_testing_partition_id) + expect(build.reload.partition_id).to eq(ci_testing_partition_id) + expect(build.reload.metadata.partition_id).to eq(ci_testing_partition_id) + expect(bridge.reload.metadata.partition_id).to eq(ci_testing_partition_id) + expect(build_need.reload.partition_id).to eq(ci_testing_partition_id) + expect(bridge_need.reload.partition_id).to eq(ci_testing_partition_id) + end +end