diff --git a/ee/app/models/ci/finished_build_ch_sync_event.rb b/ee/app/models/ci/finished_build_ch_sync_event.rb index 4e73c98a3e1399cc93ca648748731c0708ec18ff..a2cc0e40473a937450e949f0accabae4eb8dd229 100644 --- a/ee/app/models/ci/finished_build_ch_sync_event.rb +++ b/ee/app/models/ci/finished_build_ch_sync_event.rb @@ -2,6 +2,8 @@ module Ci class FinishedBuildChSyncEvent < Ci::ApplicationRecord + include EachBatch + PARTITION_DURATION = 1.day include PartitionedTable @@ -24,6 +26,8 @@ class FinishedBuildChSyncEvent < Ci::ApplicationRecord validates :build_id, presence: true validates :build_finished_at, presence: true + scope :order_by_build_id, -> { order(:build_id) } + scope :pending, -> { where(processed: false) } scope :for_partition, ->(partition) { where(partition: partition) } end diff --git a/ee/app/services/click_house/data_ingestion/ci_finished_builds_sync_service.rb b/ee/app/services/click_house/data_ingestion/ci_finished_builds_sync_service.rb new file mode 100644 index 0000000000000000000000000000000000000000..29cc888ca67fc9f2ebb326eff3375087cdf46c9f --- /dev/null +++ b/ee/app/services/click_house/data_ingestion/ci_finished_builds_sync_service.rb @@ -0,0 +1,158 @@ +# frozen_string_literal: true + +module ClickHouse + module DataIngestion + class CiFinishedBuildsSyncService + include Gitlab::ExclusiveLeaseHelpers + include Gitlab::Utils::StrongMemoize + + # the job is scheduled every 3 minutes and we will allow maximum 6 minutes runtime + # we must allow a minimum of 2 minutes + 15 seconds PG timeout + 1 minute for the various CH Gitlab::HTTP timeouts + MAX_TTL = 6.minutes.to_i + MAX_RUNTIME = 120.seconds + BUILDS_BATCH_SIZE = 500 + INSERT_BATCH_SIZE = 5000 + BUILD_ID_PARTITIONS = 100 + + def initialize(worker_index: 0, total_workers: 1) + @runtime_limiter = Analytics::CycleAnalytics::RuntimeLimiter.new(MAX_RUNTIME) + @worker_index = worker_index + @total_workers = total_workers + end + + def execute + @total_record_count = 0 + + unless enabled? + return ServiceResponse.error( + message: 'Feature ci_data_ingestion_to_click_house is disabled', + reason: :disabled + ) + end + + unless ClickHouse::Client.configuration.databases[:main].present? + return ServiceResponse.error(message: 'ClickHouse database is not configured', reason: :db_not_configured) + end + + # Prevent parallel jobs + in_lock("#{self.class.name.underscore}/worker/#{@worker_index}", ttl: MAX_TTL, retries: 0) do + ::Gitlab::Database::LoadBalancing::Session.without_sticky_writes do + report = insert_new_finished_builds + + ServiceResponse.success(payload: report) + end + end + rescue Gitlab::ExclusiveLeaseHelpers::FailedToObtainLockError => e + # Skip retrying, just let the next worker to start after a few minutes + ServiceResponse.error(message: e.message, reason: :skipped) + end + + private + + def insert_new_finished_builds + loop do + @processed_record_ids = [] + + CsvBuilder::Gzip.new(process_batch, CSV_MAPPING).render do |tempfile| + File.open(tempfile.path) do |f| + ClickHouse::Client.insert_csv(INSERT_FINISHED_BUILDS_QUERY, f, :main) + end + end + + Ci::FinishedBuildChSyncEvent.primary_key_in(@processed_record_ids).update_all(processed: true) + @processed_record_ids = [] + + unless continue? + return { + records_inserted: @total_record_count, + reached_end_of_table: !@more_records + } + end + end + end + + BUILD_FIELD_NAMES = %i[id project_id pipeline_id status runner_id].freeze + BUILD_EPOCH_FIELD_NAMES = %i[created_at queued_at started_at finished_at].freeze + RUNNER_FIELD_NAMES = %i[run_untagged type].freeze + RUNNER_MANAGER_FIELD_NAMES = %i[system_xid version revision platform architecture].freeze + + CSV_MAPPING = { + **BUILD_FIELD_NAMES.index_with { |n| n }, + **BUILD_EPOCH_FIELD_NAMES.index_with { |n| :"casted_#{n}" }, + **RUNNER_FIELD_NAMES.map { |n| :"runner_#{n}" }.index_with { |n| n }, + **RUNNER_MANAGER_FIELD_NAMES.map { |n| :"runner_manager_#{n}" }.index_with { |n| n } + }.freeze + + INSERT_FINISHED_BUILDS_QUERY = <<~SQL.squish + INSERT INTO ci_finished_builds (#{CSV_MAPPING.keys.join(', ')}) + SETTINGS async_insert=1, wait_for_async_insert=1 FORMAT CSV + SQL + + def enabled? + Feature.enabled?(:ci_data_ingestion_to_click_house) + end + + def finished_build_projections + [ + *BUILD_FIELD_NAMES, + *BUILD_EPOCH_FIELD_NAMES.map { |n| "EXTRACT(epoch FROM #{::Ci::Build.table_name}.#{n}) AS casted_#{n}" }, + "#{::Ci::Runner.table_name}.run_untagged AS runner_run_untagged", + "#{::Ci::Runner.table_name}.runner_type AS runner_type", + *RUNNER_MANAGER_FIELD_NAMES.map { |n| "#{::Ci::RunnerManager.table_name}.#{n} AS runner_manager_#{n}" } + ] + end + strong_memoize_attr :finished_build_projections + + def continue? + @more_records && !@runtime_limiter.over_time? + end + + def process_batch + Enumerator.new do |yielder| + @more_records = false + + total_records_yielded = 0 + + keyset_iterator_scope.each_batch(of: BUILDS_BATCH_SIZE) do |events_batch| + build_ids = events_batch.pluck(:build_id) # rubocop: disable CodeReuse/ActiveRecord + Ci::Build.id_in(build_ids) + .left_outer_joins(:runner, :runner_manager) + .select(:finished_at, *finished_build_projections) + .each do |build| + yielder << build + @processed_record_ids << build.id + total_records_yielded += 1 + end + + @more_records = build_ids.count == BUILDS_BATCH_SIZE + break unless continue? && total_records_yielded < INSERT_BATCH_SIZE + end + + @total_record_count += @processed_record_ids.count + end + end + + def keyset_iterator_scope + lower_bound = (@worker_index * BUILD_ID_PARTITIONS / @total_workers).to_i + upper_bound = ((@worker_index + 1) * BUILD_ID_PARTITIONS / @total_workers).to_i + + table_name = Ci::FinishedBuildChSyncEvent.quoted_table_name + array_scope = Ci::FinishedBuildChSyncEvent.select(:build_id_partition) + .from("generate_series(#{lower_bound}, #{upper_bound}) as #{table_name}(build_id_partition)") # rubocop: disable CodeReuse/ActiveRecord + + opts = { + in_operator_optimization_options: { + array_scope: array_scope, + array_mapping_scope: ->(id_expression) do + Ci::FinishedBuildChSyncEvent + .where(Arel.sql("(build_id % #{BUILD_ID_PARTITIONS})") # rubocop: disable CodeReuse/ActiveRecord + .eq(id_expression)) + end + } + } + + Gitlab::Pagination::Keyset::Iterator.new(scope: Ci::FinishedBuildChSyncEvent.pending.order_by_build_id, **opts) + end + end + end +end diff --git a/ee/app/workers/ee/ci/build_finished_worker.rb b/ee/app/workers/ee/ci/build_finished_worker.rb index a27e7894731419342e9574b5051274fafaacdd01..e7455c990f42f81312c72732c60444f8ae08e8e8 100644 --- a/ee/app/workers/ee/ci/build_finished_worker.rb +++ b/ee/app/workers/ee/ci/build_finished_worker.rb @@ -18,6 +18,10 @@ def process_build(build) end ::Ci::InstanceRunnerFailedJobs.track(build) if build.failed? + + if build.finished_at.present? && generate_finished_builds_sync_events? + ::Ci::FinishedBuildChSyncEvent.new(build_id: build.id, build_finished_at: build.finished_at).save + end end private @@ -31,6 +35,11 @@ def requirements_available?(build) !build.project.requirements.empty? && Ability.allowed?(build.user, :create_requirement_test_report, build.project) end + + def generate_finished_builds_sync_events? + ::Feature.enabled?(:generate_ci_finished_builds_sync_events) && + ::License.feature_available?(:runner_performance_insights) + end end end end diff --git a/ee/config/feature_flags/development/ci_data_ingestion_to_click_house.yml b/ee/config/feature_flags/development/ci_data_ingestion_to_click_house.yml new file mode 100644 index 0000000000000000000000000000000000000000..40bd88964291ee832ffa2378f5ec2eb261c6729a --- /dev/null +++ b/ee/config/feature_flags/development/ci_data_ingestion_to_click_house.yml @@ -0,0 +1,8 @@ +--- +name: ci_data_ingestion_to_click_house +introduced_by_url: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/132010 +rollout_issue_url: https://gitlab.com/gitlab-org/gitlab/-/issues/424866 +milestone: '16.5' +type: development +group: group::runner +default_enabled: false diff --git a/ee/config/feature_flags/development/generate_ci_finished_builds_sync_events.yml b/ee/config/feature_flags/development/generate_ci_finished_builds_sync_events.yml new file mode 100644 index 0000000000000000000000000000000000000000..4d5f6073334eda3412b31ae9fb0f2e6bf3c152f3 --- /dev/null +++ b/ee/config/feature_flags/development/generate_ci_finished_builds_sync_events.yml @@ -0,0 +1,8 @@ +--- +name: generate_ci_finished_builds_sync_events +introduced_by_url: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/132010 +rollout_issue_url: https://gitlab.com/gitlab-org/gitlab/-/issues/424866 +milestone: '16.5' +type: development +group: group::runner +default_enabled: false diff --git a/ee/spec/models/ci/finished_build_ch_sync_event_spec.rb b/ee/spec/models/ci/finished_build_ch_sync_event_spec.rb index 5071719242d72050208c1f443142ed61042f8fc7..ea7e7ab2305ba3c9128f76ab3bf5630a37786681 100644 --- a/ee/spec/models/ci/finished_build_ch_sync_event_spec.rb +++ b/ee/spec/models/ci/finished_build_ch_sync_event_spec.rb @@ -10,16 +10,6 @@ it { is_expected.to validate_presence_of(:build_finished_at) } end - describe '.pending' do - subject(:scope) { described_class.pending } - - let_it_be(:event1) { described_class.create!(build_id: 1, build_finished_at: 2.hours.ago, processed: true) } - let_it_be(:event2) { described_class.create!(build_id: 2, build_finished_at: 1.hour.ago) } - let_it_be(:event3) { described_class.create!(build_id: 3, build_finished_at: 1.hour.ago, processed: true) } - - it { is_expected.to contain_exactly(event2) } - end - describe '.for_partition', :freeze_time do subject(:scope) { described_class.for_partition(partition) } @@ -154,4 +144,28 @@ end end end + + describe 'sorting' do + let_it_be(:event3) { described_class.create!(build_id: 3, build_finished_at: 2.hours.ago, processed: true) } + let_it_be(:event1) { described_class.create!(build_id: 1, build_finished_at: 1.hour.ago) } + let_it_be(:event2) { described_class.create!(build_id: 2, build_finished_at: 1.hour.ago, processed: true) } + + describe '.order_by_build_id' do + subject(:scope) { described_class.order_by_build_id } + + it { is_expected.to eq([event1, event2, event3]) } + end + end + + describe 'scopes' do + let_it_be(:event1) { described_class.create!(build_id: 1, build_finished_at: 2.hours.ago, processed: true) } + let_it_be(:event2) { described_class.create!(build_id: 2, build_finished_at: 1.hour.ago) } + let_it_be(:event3) { described_class.create!(build_id: 3, build_finished_at: 1.hour.ago, processed: true) } + + describe '.pending' do + subject(:scope) { described_class.pending } + + it { is_expected.to contain_exactly(event2) } + end + end end diff --git a/ee/spec/services/click_house/data_ingestion/ci_finished_builds_sync_service_spec.rb b/ee/spec/services/click_house/data_ingestion/ci_finished_builds_sync_service_spec.rb new file mode 100644 index 0000000000000000000000000000000000000000..b61e82bab1a3f6cc296c6c132f1b9f37eafb0634 --- /dev/null +++ b/ee/spec/services/click_house/data_ingestion/ci_finished_builds_sync_service_spec.rb @@ -0,0 +1,256 @@ +# frozen_string_literal: true + +require 'spec_helper' + +RSpec.describe ClickHouse::DataIngestion::CiFinishedBuildsSyncService, + :click_house, feature_category: :runner_fleet do + subject(:execute) { service.execute } + + let(:service) { described_class.new } + + let_it_be(:runner) { create(:ci_runner) } + let_it_be(:runner_manager1) do + create(:ci_runner_machine, runner: runner, version: '16.4.0', revision: 'abc', platform: 'linux', + architecture: 'amd64') + end + + let_it_be(:build1) { create(:ci_build, :success, runner_manager: runner_manager1) } + let_it_be(:build2) { create(:ci_build, :canceled) } + let_it_be(:build3) { create(:ci_build, :failed) } + let_it_be(:build4) { create(:ci_build, :pending) } + + before_all do + create_sync_events(*Ci::Build.finished) + end + + context 'when the ci_data_ingestion_to_click_house feature flag is on' do + before do + stub_feature_flags(ci_data_ingestion_to_click_house: true) + end + + context 'when all builds fit in a single batch' do + it 'processes the builds' do + expect(ClickHouse::Client).to receive(:insert_csv).once.and_call_original + + expect { execute }.to change { ci_finished_builds_row_count }.by(3) + expect(execute).to have_attributes({ payload: { reached_end_of_table: true, records_inserted: 3 } }) + + records = ci_finished_builds + expect(records.count).to eq 3 + expect(records).to contain_exactly( + a_hash_including(**expected_build_attributes(build1)), + a_hash_including(**expected_build_attributes(build2)), + a_hash_including(**expected_build_attributes(build3)) + ) + end + + it 'processes only builds from Ci::FinishedBuildChSyncEvent' do + build = create(:ci_build, :failed) + + expect { execute }.to change { ci_finished_builds_row_count }.by(3) + expect(execute).to have_attributes({ payload: { reached_end_of_table: true, records_inserted: 3 } }) + + create_sync_events(build) + expect { service.execute }.to change { ci_finished_builds_row_count }.by(1) + end + + context 'when a finished build has nil finished_at value' do + it 'skips the build' do + create(:ci_build, :failed, finished_at: nil) + + expect { execute }.to change { ci_finished_builds_row_count }.by(3) + records = ci_finished_builds + expect(records.count).to eq 3 + expect(records).to contain_exactly( + a_hash_including(**expected_build_attributes(build1)), + a_hash_including(**expected_build_attributes(build2)), + a_hash_including(**expected_build_attributes(build3)) + ) + end + end + end + + context 'when multiple batches are required' do + before do + stub_const("#{described_class}::BUILDS_BATCH_SIZE", 2) + end + + it 'processes the builds' do + expect(ClickHouse::Client).to receive(:insert_csv).once.and_call_original + + expect { execute }.to change { ci_finished_builds_row_count }.by(3) + expect(execute).to have_attributes({ payload: { reached_end_of_table: true, records_inserted: 3 } }) + end + end + + context 'when multiple CSV uploads are required' do + before do + stub_const("#{described_class}::BUILDS_BATCH_SIZE", 1) + stub_const("#{described_class}::INSERT_BATCH_SIZE", 2) + end + + it 'processes the builds' do + expect(ClickHouse::Client).to receive(:insert_csv).twice.and_call_original + + expect { execute }.to change { ci_finished_builds_row_count }.by(3) + expect(execute).to have_attributes({ payload: { reached_end_of_table: true, records_inserted: 3 } }) + end + + context 'with time limit being reached' do + it 'processes the builds' do + expect(ClickHouse::Client).to receive(:insert_csv).once.and_call_original + expect_next_instance_of(Analytics::CycleAnalytics::RuntimeLimiter) do |limiter| + expect(limiter).to receive(:over_time?).at_least(1).and_return(true) + end + + expect { execute }.to change { ci_finished_builds_row_count }.by(described_class::BUILDS_BATCH_SIZE) + expect(execute).to have_attributes({ payload: { + reached_end_of_table: false, records_inserted: described_class::BUILDS_BATCH_SIZE + } }) + end + end + + context 'when batches fail to be written to ClickHouse' do + it 'does not mark any records as processed' do + expect(ClickHouse::Client).to receive(:insert_csv) { raise ClickHouse::Client::DatabaseError } + + expect { execute }.to raise_error(ClickHouse::Client::DatabaseError) + .and not_change { Ci::FinishedBuildChSyncEvent.pending.count } + end + end + end + + context 'with multiple calls to service' do + it 'processes the builds' do + expect { service.execute }.to change { ci_finished_builds_row_count }.by(3) + + build4 = create(:ci_build, :failed) + create_sync_events(build4) + + expect { service.execute }.to change { ci_finished_builds_row_count }.by(1) + records = ci_finished_builds + expect(records.count).to eq 4 + expect(records).to contain_exactly( + a_hash_including(**expected_build_attributes(build1)), + a_hash_including(**expected_build_attributes(build2)), + a_hash_including(**expected_build_attributes(build3)), + a_hash_including(**expected_build_attributes(build4)) + ) + end + + context 'with same updated_at value' do + it 'processes the builds' do + expect { service.execute }.to change { ci_finished_builds_row_count }.by(3) + + build4 = create(:ci_build, :failed) + build5 = create(:ci_build, :failed) + create_sync_events(build4, build5) + + expect { execute }.to change { ci_finished_builds_row_count }.by(2) + records = ci_finished_builds + expect(records.count).to eq 5 + expect(records).to contain_exactly( + a_hash_including(**expected_build_attributes(build1)), + a_hash_including(**expected_build_attributes(build2)), + a_hash_including(**expected_build_attributes(build3)), + a_hash_including(**expected_build_attributes(build4)), + a_hash_including(**expected_build_attributes(build5)) + ) + end + end + + context 'with older finished_at value' do + it 'does not process the build' do + expect { service.execute }.to change { ci_finished_builds_row_count }.by(3) + + create(:ci_build, :failed) + + expect { service.execute }.not_to change { ci_finished_builds_row_count } + end + end + end + + context 'when no ClickHouse databases are configured' do + before do + allow(ClickHouse::Client.configuration).to receive(:databases).and_return({}) + end + + it 'skips execution' do + is_expected.to have_attributes({ + status: :error, + message: 'ClickHouse database is not configured', + reason: :db_not_configured + }) + end + end + + context 'when exclusive lease error happens' do + context 'when the exclusive lease is already locked for the worker' do + let(:service) { described_class.new(worker_index: 2, total_workers: 3) } + + before do + lock_name = "#{described_class.name.underscore}/worker/2" + allow(service).to receive(:in_lock).with(lock_name, retries: 0, ttl: 360) + .and_raise(Gitlab::ExclusiveLeaseHelpers::FailedToObtainLockError) + end + + it 'does nothing' do + expect { execute }.not_to change { ci_finished_builds_row_count } + + expect(execute).to have_attributes({ status: :error, reason: :skipped }) + end + end + end + end + + context 'when the ci_data_ingestion_to_click_house feature flag is off' do + before do + stub_feature_flags(ci_data_ingestion_to_click_house: false) + end + + it 'skips execution' do + is_expected.to have_attributes({ + status: :error, + message: 'Feature ci_data_ingestion_to_click_house is disabled', + reason: :disabled + }) + end + end + + def create_sync_events(*builds) + builds.each do |build| + Ci::FinishedBuildChSyncEvent.new(build_id: build.id, build_finished_at: build.finished_at).save! + end + end + + def ci_finished_builds_row_count + ClickHouse::Client.select('SELECT COUNT(*) AS count FROM ci_finished_builds', :main).first['count'] + end + + def ci_finished_builds + ClickHouse::Client + .select('SELECT * FROM ci_finished_builds', :main) + .map(&:symbolize_keys) + end + + def expected_build_attributes(build) + runner = build.runner + runner_manager = build.runner_manager + + { + id: build.id, status: build.status, project_id: build.project_id, pipeline_id: build.pipeline_id, + created_at: a_value_within(1.second).of(build.created_at), + started_at: a_value_within(1.second).of(build.started_at), + queued_at: a_value_within(1.second).of(build.queued_at), + finished_at: a_value_within(1.second).of(build.finished_at), + runner_id: runner&.id || 0, + runner_type: Ci::Runner.runner_types.fetch(runner&.runner_type, 0), + runner_run_untagged: runner&.run_untagged || false, + runner_manager_system_xid: runner_manager&.system_xid || '', + runner_manager_version: runner_manager&.version || '', + runner_manager_revision: runner_manager&.revision || '', + runner_manager_platform: runner_manager&.platform || '', + runner_manager_architecture: runner_manager&.architecture || '' + } + end +end diff --git a/ee/spec/workers/ee/ci/build_finished_worker_spec.rb b/ee/spec/workers/ee/ci/build_finished_worker_spec.rb index 5700f28bad6c5e015d9a5c40ddedc298a2a0646a..1b27521603fc2bb4b3072725e268b9bff9b24399 100644 --- a/ee/spec/workers/ee/ci/build_finished_worker_spec.rb +++ b/ee/spec/workers/ee/ci/build_finished_worker_spec.rb @@ -4,7 +4,10 @@ RSpec.describe Ci::BuildFinishedWorker, feature_category: :continuous_integration do let_it_be(:ci_runner) { create(:ci_runner) } - let_it_be_with_reload(:build) { create(:ee_ci_build, :sast, :success, runner: ci_runner) } + let_it_be_with_reload(:build) do + create(:ee_ci_build, :sast, :success, runner: ci_runner, finished_at: 1.hour.ago) + end + let_it_be(:project) { build.project } let_it_be(:namespace) { project.shared_runners_limit_namespace } @@ -166,5 +169,39 @@ def project_stats perform end end + + describe 'finished builds sync event' do + before do + stub_licensed_features(runner_performance_insights: runner_performance_insights) + end + + context 'when feature is not available' do + let(:runner_performance_insights) { false } + + it 'does not save job on Ci::FinishedBuildChSyncEvent by default' do + expect { perform }.not_to change { Ci::FinishedBuildChSyncEvent.count } + end + end + + context 'when feature is available' do + let(:runner_performance_insights) { true } + + it 'saves job on Ci::FinishedBuildChSyncEvent by default' do + expect { perform }.to change { Ci::FinishedBuildChSyncEvent.all } + .from([]) + .to([an_object_having_attributes(build_id: build.id, build_finished_at: build.finished_at)]) + end + + context 'when generate_ci_finished_builds_sync_events FF is disabled' do + before do + stub_feature_flags(generate_ci_finished_builds_sync_events: false) + end + + it 'does not save job on Ci::FinishedBuildChSyncEvent by default' do + expect { perform }.not_to change { Ci::FinishedBuildChSyncEvent.count } + end + end + end + end end end diff --git a/lib/gitlab/exclusive_lease_helpers.rb b/lib/gitlab/exclusive_lease_helpers.rb index 7cf0232fbf2de82cc6e1c2a0a7e57c4af0477ede..8836cb34745cc735d0a3d0b7a03f6cac73d8b1e2 100644 --- a/lib/gitlab/exclusive_lease_helpers.rb +++ b/lib/gitlab/exclusive_lease_helpers.rb @@ -1,7 +1,7 @@ # frozen_string_literal: true module Gitlab - # This module provides helper methods which are intregrated with GitLab::ExclusiveLease + # This module provides helper methods which are integrated with GitLab::ExclusiveLease module ExclusiveLeaseHelpers FailedToObtainLockError = Class.new(StandardError) diff --git a/spec/factories/ci/builds.rb b/spec/factories/ci/builds.rb index 7325ab309894de61579d2e3091400931ba2a922e..f0b09194047e91a0adfcc850349e99435d294491 100644 --- a/spec/factories/ci/builds.rb +++ b/spec/factories/ci/builds.rb @@ -30,6 +30,15 @@ ref { pipeline.ref } + runner_manager { nil } + + after(:build) do |build, evaluator| + if evaluator.runner_manager + build.runner = evaluator.runner_manager.runner + create(:ci_runner_machine_build, build: build, runner_manager: evaluator.runner_manager) + end + end + trait :with_token do transient do generate_token { true }