From fc5c1eb149dcba2c0ec97e0685012ec20ce58993 Mon Sep 17 00:00:00 2001 From: Pedro Pombeiro Date: Mon, 18 Sep 2023 11:10:48 +0200 Subject: [PATCH 01/10] Populate p_ci_finished_build_ch_sync_events EE: true --- ee/app/workers/ee/ci/build_finished_worker.rb | 9 +++++ ...enerate_ci_finished_builds_sync_events.yml | 8 ++++ .../ee/ci/build_finished_worker_spec.rb | 39 ++++++++++++++++++- 3 files changed, 55 insertions(+), 1 deletion(-) create mode 100644 ee/config/feature_flags/development/generate_ci_finished_builds_sync_events.yml diff --git a/ee/app/workers/ee/ci/build_finished_worker.rb b/ee/app/workers/ee/ci/build_finished_worker.rb index a27e7894731419..e7455c990f42f8 100644 --- a/ee/app/workers/ee/ci/build_finished_worker.rb +++ b/ee/app/workers/ee/ci/build_finished_worker.rb @@ -18,6 +18,10 @@ def process_build(build) end ::Ci::InstanceRunnerFailedJobs.track(build) if build.failed? + + if build.finished_at.present? && generate_finished_builds_sync_events? + ::Ci::FinishedBuildChSyncEvent.new(build_id: build.id, build_finished_at: build.finished_at).save + end end private @@ -31,6 +35,11 @@ def requirements_available?(build) !build.project.requirements.empty? && Ability.allowed?(build.user, :create_requirement_test_report, build.project) end + + def generate_finished_builds_sync_events? + ::Feature.enabled?(:generate_ci_finished_builds_sync_events) && + ::License.feature_available?(:runner_performance_insights) + end end end end diff --git a/ee/config/feature_flags/development/generate_ci_finished_builds_sync_events.yml b/ee/config/feature_flags/development/generate_ci_finished_builds_sync_events.yml new file mode 100644 index 00000000000000..4d5f6073334eda --- /dev/null +++ b/ee/config/feature_flags/development/generate_ci_finished_builds_sync_events.yml @@ -0,0 +1,8 @@ +--- +name: generate_ci_finished_builds_sync_events +introduced_by_url: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/132010 +rollout_issue_url: https://gitlab.com/gitlab-org/gitlab/-/issues/424866 +milestone: '16.5' +type: development +group: group::runner +default_enabled: false diff --git a/ee/spec/workers/ee/ci/build_finished_worker_spec.rb b/ee/spec/workers/ee/ci/build_finished_worker_spec.rb index 5700f28bad6c5e..1b27521603fc2b 100644 --- a/ee/spec/workers/ee/ci/build_finished_worker_spec.rb +++ b/ee/spec/workers/ee/ci/build_finished_worker_spec.rb @@ -4,7 +4,10 @@ RSpec.describe Ci::BuildFinishedWorker, feature_category: :continuous_integration do let_it_be(:ci_runner) { create(:ci_runner) } - let_it_be_with_reload(:build) { create(:ee_ci_build, :sast, :success, runner: ci_runner) } + let_it_be_with_reload(:build) do + create(:ee_ci_build, :sast, :success, runner: ci_runner, finished_at: 1.hour.ago) + end + let_it_be(:project) { build.project } let_it_be(:namespace) { project.shared_runners_limit_namespace } @@ -166,5 +169,39 @@ def project_stats perform end end + + describe 'finished builds sync event' do + before do + stub_licensed_features(runner_performance_insights: runner_performance_insights) + end + + context 'when feature is not available' do + let(:runner_performance_insights) { false } + + it 'does not save job on Ci::FinishedBuildChSyncEvent by default' do + expect { perform }.not_to change { Ci::FinishedBuildChSyncEvent.count } + end + end + + context 'when feature is available' do + let(:runner_performance_insights) { true } + + it 'saves job on Ci::FinishedBuildChSyncEvent by default' do + expect { perform }.to change { Ci::FinishedBuildChSyncEvent.all } + .from([]) + .to([an_object_having_attributes(build_id: build.id, build_finished_at: build.finished_at)]) + end + + context 'when generate_ci_finished_builds_sync_events FF is disabled' do + before do + stub_feature_flags(generate_ci_finished_builds_sync_events: false) + end + + it 'does not save job on Ci::FinishedBuildChSyncEvent by default' do + expect { perform }.not_to change { Ci::FinishedBuildChSyncEvent.count } + end + end + end + end end end -- GitLab From dd5bf1af60a3472c5cc153df9eb2b3c12cfbf14e Mon Sep 17 00:00:00 2001 From: Pedro Pombeiro Date: Mon, 11 Sep 2023 19:28:19 +0200 Subject: [PATCH 02/10] Introduce service to sync finished builds to ClickHouse EE: true --- .../models/ci/finished_build_ch_sync_event.rb | 2 + .../ci_finished_builds_sync_service.rb | 152 ++++++++++ .../ci_data_ingestion_to_click_house.yml | 8 + .../ci/finished_build_ch_sync_event_spec.rb | 16 +- .../ci_finished_builds_sync_service_spec.rb | 277 ++++++++++++++++++ lib/gitlab/exclusive_lease_helpers.rb | 2 +- spec/factories/ci/builds.rb | 9 + 7 files changed, 459 insertions(+), 7 deletions(-) create mode 100644 ee/app/services/click_house/data_ingestion/ci_finished_builds_sync_service.rb create mode 100644 ee/config/feature_flags/development/ci_data_ingestion_to_click_house.yml create mode 100644 ee/spec/services/click_house/data_ingestion/ci_finished_builds_sync_service_spec.rb diff --git a/ee/app/models/ci/finished_build_ch_sync_event.rb b/ee/app/models/ci/finished_build_ch_sync_event.rb index 4e73c98a3e1399..64f9bb1cdc75c9 100644 --- a/ee/app/models/ci/finished_build_ch_sync_event.rb +++ b/ee/app/models/ci/finished_build_ch_sync_event.rb @@ -2,6 +2,8 @@ module Ci class FinishedBuildChSyncEvent < Ci::ApplicationRecord + include EachBatch + PARTITION_DURATION = 1.day include PartitionedTable diff --git a/ee/app/services/click_house/data_ingestion/ci_finished_builds_sync_service.rb b/ee/app/services/click_house/data_ingestion/ci_finished_builds_sync_service.rb new file mode 100644 index 00000000000000..6bad027ed86df0 --- /dev/null +++ b/ee/app/services/click_house/data_ingestion/ci_finished_builds_sync_service.rb @@ -0,0 +1,152 @@ +# frozen_string_literal: true + +module ClickHouse + module DataIngestion + class CiFinishedBuildsSyncService + include Gitlab::ExclusiveLeaseHelpers + include Gitlab::Utils::StrongMemoize + + # the job is scheduled every 3 minutes and we will allow maximum 2.5 minutes runtime + MAX_TTL = 2.5.minutes.to_i + MAX_RUNTIME = 120.seconds + BUILDS_BATCH_SIZE = 500 + INSERT_BATCH_SIZE = 5000 + + def initialize(worker_index: 0, total_workers: 1) + @runtime_limiter = Analytics::CycleAnalytics::RuntimeLimiter.new(MAX_RUNTIME) + @worker_index = worker_index + @total_workers = total_workers + end + + def execute + @total_record_count = 0 + + return ServiceResponse.success(payload: { status: :disabled }) unless enabled? + + unless ClickHouse::Client.configuration.databases[:main].present? + return ServiceResponse.error( + message: 'ClickHouse database is not configured', payload: { status: :db_not_configured } + ) + end + + # Prevent parallel jobs + in_lock("#{self.class.name.underscore}/worker/#{@worker_index}", ttl: MAX_TTL, retries: 0) do + ::Gitlab::Database::LoadBalancing::Session.without_sticky_writes do + report = insert_new_finished_builds + + ServiceResponse.success(payload: report) + end + end + rescue Gitlab::ExclusiveLeaseHelpers::FailedToObtainLockError => e + # Skip retrying, just let the next worker to start after a few minutes + ServiceResponse.error(message: e.message, payload: { status: :skipped }) + end + + private + + def insert_new_finished_builds + loop do + Ci::FinishedBuildChSyncEvent.transaction do + CsvBuilder::Gzip.new(process_batch, CSV_MAPPING).render do |tempfile| + ClickHouse::Client.insert_csv(INSERT_FINISHED_BUILDS_QUERY, File.open(tempfile.path), :main) + end + end + + unless continue? + return { + status: :processed, + records_inserted: @total_record_count, + reached_end_of_table: !@more_records + } + end + end + end + + BUILD_FIELD_NAMES = %i[id project_id pipeline_id status runner_id].freeze + BUILD_EPOCH_FIELD_NAMES = %i[created_at queued_at started_at finished_at].freeze + RUNNER_FIELD_NAMES = %i[run_untagged type].freeze + RUNNER_MANAGER_FIELD_NAMES = %i[system_xid version revision platform architecture].freeze + + CSV_MAPPING = { + **BUILD_FIELD_NAMES.index_with { |n| n }, + **BUILD_EPOCH_FIELD_NAMES.index_with { |n| "casted_#{n}".to_sym }, + **RUNNER_FIELD_NAMES.map { |n| "runner_#{n}".to_sym }.index_with { |n| n }, + **RUNNER_MANAGER_FIELD_NAMES.map { |n| "runner_manager_#{n}".to_sym }.index_with { |n| n } + }.freeze + + INSERT_FINISHED_BUILDS_QUERY = <<~SQL.squish + INSERT INTO ci_finished_builds (#{CSV_MAPPING.keys.join(', ')}) + SETTINGS async_insert=1, wait_for_async_insert=1 FORMAT CSV + SQL + + def enabled? + Feature.enabled?(:ci_data_ingestion_to_click_house) + end + + def finished_build_projections + [ + *BUILD_FIELD_NAMES, + *BUILD_EPOCH_FIELD_NAMES.map { |n| "EXTRACT(epoch FROM #{::Ci::Build.table_name}.#{n}) AS casted_#{n}" }, + "#{::Ci::Runner.table_name}.run_untagged AS runner_run_untagged", + "#{::Ci::Runner.table_name}.runner_type AS runner_type", + *RUNNER_MANAGER_FIELD_NAMES.map { |n| "#{::Ci::RunnerManager.table_name}.#{n} AS runner_manager_#{n}" } + ] + end + strong_memoize_attr :finished_build_projections + + def continue? + @more_records && !@runtime_limiter.over_time? + end + + def process_batch + Enumerator.new do |yielder| + @more_records = false + + keyset_iterator_scope.each_batch(of: BUILDS_BATCH_SIZE) do |events_batch| + total_records_yielded = 0 + + build_ids = events_batch.pluck(:build_id) # rubocop: disable CodeReuse/ActiveRecord + Ci::Build.id_in(build_ids) + .left_outer_joins(:runner, :runner_manager) + .select(:finished_at, *finished_build_projections) + .each do |build| + yielder << build + total_records_yielded += 1 + end + + Ci::FinishedBuildChSyncEvent.primary_key_in(build_ids).update_all(processed: true) + + @more_records = total_records_yielded == BUILDS_BATCH_SIZE + @total_record_count += total_records_yielded + break unless continue? && total_records_yielded < INSERT_BATCH_SIZE + end + end + end + + def keyset_iterator_scope + lower_bound = (@worker_index * 100 / @total_workers).to_i + upper_bound = ((@worker_index + 1) * 100 / @total_workers).to_i + + table_name = Ci::FinishedBuildChSyncEvent.quoted_table_name + # rubocop: disable CodeReuse/ActiveRecord + array_scope = Ci::FinishedBuildChSyncEvent + .select(:build_id_partition) + .from("generate_series(#{lower_bound}, #{upper_bound}) as #{table_name}(build_id_partition)") + + scope = Ci::FinishedBuildChSyncEvent.pending.order(:build_id) + # rubocop: enable CodeReuse/ActiveRecord + + opts = { + in_operator_optimization_options: { + array_scope: array_scope, + array_mapping_scope: ->(id_expression) do + Ci::FinishedBuildChSyncEvent.where(Arel.sql('(build_id % 100)').eq(id_expression)) # rubocop: disable CodeReuse/ActiveRecord + end + } + } + + Gitlab::Pagination::Keyset::Iterator.new(scope: scope, **opts) + end + end + end +end diff --git a/ee/config/feature_flags/development/ci_data_ingestion_to_click_house.yml b/ee/config/feature_flags/development/ci_data_ingestion_to_click_house.yml new file mode 100644 index 00000000000000..40bd88964291ee --- /dev/null +++ b/ee/config/feature_flags/development/ci_data_ingestion_to_click_house.yml @@ -0,0 +1,8 @@ +--- +name: ci_data_ingestion_to_click_house +introduced_by_url: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/132010 +rollout_issue_url: https://gitlab.com/gitlab-org/gitlab/-/issues/424866 +milestone: '16.5' +type: development +group: group::runner +default_enabled: false diff --git a/ee/spec/models/ci/finished_build_ch_sync_event_spec.rb b/ee/spec/models/ci/finished_build_ch_sync_event_spec.rb index 5071719242d720..8c6d006d83c98e 100644 --- a/ee/spec/models/ci/finished_build_ch_sync_event_spec.rb +++ b/ee/spec/models/ci/finished_build_ch_sync_event_spec.rb @@ -10,14 +10,18 @@ it { is_expected.to validate_presence_of(:build_finished_at) } end - describe '.pending' do - subject(:scope) { described_class.pending } - + describe 'scopes' do let_it_be(:event1) { described_class.create!(build_id: 1, build_finished_at: 2.hours.ago, processed: true) } - let_it_be(:event2) { described_class.create!(build_id: 2, build_finished_at: 1.hour.ago) } - let_it_be(:event3) { described_class.create!(build_id: 3, build_finished_at: 1.hour.ago, processed: true) } + let_it_be(:event2) { described_class.create!(build_id: 32, build_finished_at: 1.hour.ago) } + let_it_be(:event3) { described_class.create!(build_id: 133, build_finished_at: 1.hour.ago) } + let_it_be(:event4) { described_class.create!(build_id: 266, build_finished_at: 1.hour.ago, processed: true) } + let_it_be(:event5) { described_class.create!(build_id: 367, build_finished_at: 1.hour.ago, processed: true) } + + describe '.pending' do + subject(:scope) { described_class.pending } - it { is_expected.to contain_exactly(event2) } + it { is_expected.to contain_exactly(event2, event3) } + end end describe '.for_partition', :freeze_time do diff --git a/ee/spec/services/click_house/data_ingestion/ci_finished_builds_sync_service_spec.rb b/ee/spec/services/click_house/data_ingestion/ci_finished_builds_sync_service_spec.rb new file mode 100644 index 00000000000000..34da91344683f3 --- /dev/null +++ b/ee/spec/services/click_house/data_ingestion/ci_finished_builds_sync_service_spec.rb @@ -0,0 +1,277 @@ +# frozen_string_literal: true + +require 'spec_helper' + +RSpec.describe ClickHouse::DataIngestion::CiFinishedBuildsSyncService, + :click_house, feature_category: :runner_fleet do + subject(:execute) { service.execute } + + let(:service) { described_class.new } + + let_it_be(:runner) { create(:ci_runner) } + let_it_be(:runner_manager1) do + create(:ci_runner_machine, runner: runner, version: '16.4.0', revision: 'abc', platform: 'linux', + architecture: 'amd64') + end + + let_it_be(:build1) do + create(:ci_build, :success, runner_manager: runner_manager1, created_at: 1.hour.ago, + queued_at: 59.minutes.ago, started_at: 57.minutes.ago, finished_at: 30.minutes.ago) + end + + let_it_be(:build2) do + create(:ci_build, :canceled, created_at: 30.minutes.ago, queued_at: 30.minutes.ago, + started_at: 29.minutes.ago, finished_at: 15.minutes.ago) + end + + let_it_be(:build3) do + create(:ci_build, :failed, created_at: 30.minutes.ago, queued_at: 30.minutes.ago, + started_at: 29.minutes.ago, finished_at: 15.minutes.ago) + end + + let_it_be(:build4) do + create(:ci_build, :pending) + end + + before do + create_sync_events(*Ci::Build.finished) + end + + context 'when the ci_data_ingestion_to_click_house feature flag is on' do + before do + stub_feature_flags(ci_data_ingestion_to_click_house: true) + end + + context 'when all builds fit in a single batch' do + it 'processes the builds' do + expect(ClickHouse::Client).to receive(:insert_csv).once.and_call_original + + expect { execute }.to change { ci_finished_builds_row_count }.by(3) + expect(execute).to have_attributes({ payload: { + reached_end_of_table: true, records_inserted: 3, status: :processed + } }) + + records_by_id = ci_finished_builds_by_id + expect(records_by_id.count).to eq 3 + expect(records_by_id).to match( + build1.id => a_hash_including(**expected_build_attributes(build1)), + build2.id => a_hash_including(**expected_build_attributes(build2)), + build3.id => a_hash_including(**expected_build_attributes(build3)) + ) + end + + it 'processes only builds from Ci::FinishedBuildChSyncEvent' do + build = create(:ci_build, :failed, created_at: 30.minutes.ago, queued_at: 30.minutes.ago, + started_at: 30.minutes.ago, finished_at: 1.minute.ago) + + expect { execute }.to change { ci_finished_builds_row_count }.by(3) + expect(execute).to have_attributes({ payload: { + reached_end_of_table: true, records_inserted: 3, status: :processed + } }) + + create_sync_events(build) + expect { service.execute }.to change { ci_finished_builds_row_count }.by(1) + end + + context 'when a finished build has nil finished_at value' do + it 'skips the build' do + create(:ci_build, :failed, created_at: 30.minutes.ago, queued_at: 30.minutes.ago, + started_at: 30.minutes.ago, finished_at: nil) + + expect { execute }.to change { ci_finished_builds_row_count }.by(3) + records_by_id = ci_finished_builds_by_id + expect(records_by_id.count).to eq 3 + expect(records_by_id).to match( + build1.id => a_hash_including(**expected_build_attributes(build1)), + build2.id => a_hash_including(**expected_build_attributes(build2)), + build3.id => a_hash_including(**expected_build_attributes(build3)) + ) + end + end + end + + context 'when multiple batches are required' do + before do + stub_const("#{described_class}::BUILDS_BATCH_SIZE", 2) + end + + it 'processes the builds' do + expect(ClickHouse::Client).to receive(:insert_csv).once.and_call_original + + expect { execute }.to change { ci_finished_builds_row_count }.by(2) + expect(execute).to have_attributes({ payload: { + reached_end_of_table: true, records_inserted: 2, status: :processed + } }) + end + end + + context 'when multiple CSV uploads are required' do + before do + stub_const("#{described_class}::BUILDS_BATCH_SIZE", 2) + stub_const("#{described_class}::INSERT_BATCH_SIZE", 2) + end + + it 'processes the builds' do + expect(ClickHouse::Client).to receive(:insert_csv).twice.and_call_original + + expect { execute }.to change { ci_finished_builds_row_count }.by(3) + expect(execute).to have_attributes({ payload: { + reached_end_of_table: true, records_inserted: 3, status: :processed + } }) + end + + context 'with time limit being reached' do + it 'processes the builds' do + expect(ClickHouse::Client).to receive(:insert_csv).once.and_call_original + expect_next_instance_of(Analytics::CycleAnalytics::RuntimeLimiter) do |limiter| + expect(limiter).to receive(:over_time?).at_least(1).and_return(true) + end + + expect { execute }.to change { ci_finished_builds_row_count }.by(2) + expect(execute).to have_attributes({ payload: { + reached_end_of_table: false, records_inserted: 2, status: :processed + } }) + end + end + + context 'when batches fail to be written to ClickHouse' do + it 'does not mark any records as processed' do + expect(ClickHouse::Client).to receive(:insert_csv) { raise ClickHouse::Client::DatabaseError } + + expect { execute }.to raise_error(ClickHouse::Client::DatabaseError) + .and not_change { Ci::FinishedBuildChSyncEvent.pending.count } + end + end + end + + context 'with multiple calls to service' do + it 'processes the builds' do + expect { service.execute }.to change { ci_finished_builds_row_count }.by(3) + + build4 = create(:ci_build, :failed, created_at: 3.minutes.ago, queued_at: 3.minutes.ago, + started_at: 2.minutes.ago, finished_at: 1.minute.ago) + create_sync_events(build4) + + expect { service.execute }.to change { ci_finished_builds_row_count }.by(1) + records_by_id = ci_finished_builds_by_id + expect(records_by_id.count).to eq 4 + expect(records_by_id).to match( + build1.id => a_hash_including(**expected_build_attributes(build1)), + build2.id => a_hash_including(**expected_build_attributes(build2)), + build3.id => a_hash_including(**expected_build_attributes(build3)), + build4.id => a_hash_including(**expected_build_attributes(build4)) + ) + end + + context 'with same updated_at value' do + it 'processes the builds' do + expect { service.execute }.to change { ci_finished_builds_row_count }.by(3) + + build4 = create(:ci_build, :failed, created_at: 3.minutes.ago, queued_at: 3.minutes.ago, + started_at: 2.minutes.ago, finished_at: 1.minute.ago) + build5 = create(:ci_build, :failed, created_at: 2.minutes.ago, queued_at: 2.minutes.ago, + started_at: 2.minutes.ago, finished_at: 1.minute.ago) + create_sync_events(build4, build5) + + expect { execute }.to change { ci_finished_builds_row_count }.by(2) + records_by_id = ci_finished_builds_by_id + expect(records_by_id.count).to eq 5 + expect(records_by_id).to match( + build1.id => a_hash_including(**expected_build_attributes(build1)), + build2.id => a_hash_including(**expected_build_attributes(build2)), + build3.id => a_hash_including(**expected_build_attributes(build3)), + build4.id => a_hash_including(**expected_build_attributes(build4)), + build5.id => a_hash_including(**expected_build_attributes(build5)) + ) + end + end + + context 'with older finished_at value' do + it 'does not process the build' do + expect { service.execute }.to change { ci_finished_builds_row_count }.by(3) + + create(:ci_build, :failed, created_at: 20.minutes.ago, queued_at: 20.minutes.ago, + started_at: 20.minutes.ago, finished_at: 19.minutes.ago) + + expect { service.execute }.not_to change { ci_finished_builds_row_count } + end + end + end + + context 'when no ClickHouse databases are configured' do + before do + allow(ClickHouse::Client.configuration).to receive(:databases).and_return({}) + end + + it 'skips execution' do + is_expected.to have_attributes({ payload: { status: :db_not_configured } }) + end + end + + context 'when exclusive lease error happens' do + context 'when the exclusive lease is already locked for the worker' do + let(:service) { described_class.new(worker_index: 2, total_workers: 3) } + + before do + lock_name = "#{described_class.name.underscore}/worker/2" + allow(service).to receive(:in_lock).with(lock_name, retries: 0, ttl: 150) + .and_raise(Gitlab::ExclusiveLeaseHelpers::FailedToObtainLockError) + end + + it 'does nothing' do + expect { execute }.not_to change { ci_finished_builds_row_count } + + expect(execute).to have_attributes({ status: :error, payload: { status: :skipped } }) + end + end + end + end + + context 'when the ci_data_ingestion_to_click_house feature flag is off' do + before do + stub_feature_flags(ci_data_ingestion_to_click_house: false) + end + + it 'skips execution' do + is_expected.to have_attributes({ payload: { status: :disabled } }) + end + end + + def create_sync_events(*builds) + builds.each do |build| + Ci::FinishedBuildChSyncEvent.new(build_id: build.id, build_finished_at: build.finished_at).save! + end + end + + def ci_finished_builds_row_count + ClickHouse::Client.select('SELECT COUNT(*) AS count FROM ci_finished_builds', :main).first['count'] + end + + def ci_finished_builds_by_id + ClickHouse::Client + .select('SELECT * FROM ci_finished_builds', :main) + .map(&:symbolize_keys) + .index_by { |r| r[:id] } + end + + def expected_build_attributes(build) + runner = build.runner + runner_manager = build.runner_manager + + { + id: build.id, status: build.status, project_id: build.project_id, pipeline_id: build.pipeline_id, + created_at: a_value_within(1.second).of(build.created_at), + started_at: a_value_within(1.second).of(build.started_at), + queued_at: a_value_within(1.second).of(build.queued_at), + finished_at: a_value_within(1.second).of(build.finished_at), + runner_id: runner&.id || 0, + runner_type: Ci::Runner.runner_types.fetch(runner&.runner_type, 0), + runner_run_untagged: runner&.run_untagged || false, + runner_manager_system_xid: runner_manager&.system_xid || '', + runner_manager_version: runner_manager&.version || '', + runner_manager_revision: runner_manager&.revision || '', + runner_manager_platform: runner_manager&.platform || '', + runner_manager_architecture: runner_manager&.architecture || '' + } + end +end diff --git a/lib/gitlab/exclusive_lease_helpers.rb b/lib/gitlab/exclusive_lease_helpers.rb index 7cf0232fbf2de8..8836cb34745cc7 100644 --- a/lib/gitlab/exclusive_lease_helpers.rb +++ b/lib/gitlab/exclusive_lease_helpers.rb @@ -1,7 +1,7 @@ # frozen_string_literal: true module Gitlab - # This module provides helper methods which are intregrated with GitLab::ExclusiveLease + # This module provides helper methods which are integrated with GitLab::ExclusiveLease module ExclusiveLeaseHelpers FailedToObtainLockError = Class.new(StandardError) diff --git a/spec/factories/ci/builds.rb b/spec/factories/ci/builds.rb index 7325ab309894de..f0b09194047e91 100644 --- a/spec/factories/ci/builds.rb +++ b/spec/factories/ci/builds.rb @@ -30,6 +30,15 @@ ref { pipeline.ref } + runner_manager { nil } + + after(:build) do |build, evaluator| + if evaluator.runner_manager + build.runner = evaluator.runner_manager.runner + create(:ci_runner_machine_build, build: build, runner_manager: evaluator.runner_manager) + end + end + trait :with_token do transient do generate_token { true } -- GitLab From 85cfebac5ee5be89b0f30aecbc25bde0056f7b27 Mon Sep 17 00:00:00 2001 From: Pedro Pombeiro Date: Tue, 26 Sep 2023 10:39:07 +0200 Subject: [PATCH 03/10] Increase MAX_TTL --- .../data_ingestion/ci_finished_builds_sync_service.rb | 5 +++-- .../data_ingestion/ci_finished_builds_sync_service_spec.rb | 2 +- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/ee/app/services/click_house/data_ingestion/ci_finished_builds_sync_service.rb b/ee/app/services/click_house/data_ingestion/ci_finished_builds_sync_service.rb index 6bad027ed86df0..7ca57877ecbc64 100644 --- a/ee/app/services/click_house/data_ingestion/ci_finished_builds_sync_service.rb +++ b/ee/app/services/click_house/data_ingestion/ci_finished_builds_sync_service.rb @@ -6,8 +6,9 @@ class CiFinishedBuildsSyncService include Gitlab::ExclusiveLeaseHelpers include Gitlab::Utils::StrongMemoize - # the job is scheduled every 3 minutes and we will allow maximum 2.5 minutes runtime - MAX_TTL = 2.5.minutes.to_i + # the job is scheduled every 4 minutes and we will allow maximum 3.25 minutes runtime + # (2 minutes + 15 seconds PG timeout + 1 minute for the various CH Gitlab::HTTP timeouts) + MAX_TTL = 3.25.minutes.to_i MAX_RUNTIME = 120.seconds BUILDS_BATCH_SIZE = 500 INSERT_BATCH_SIZE = 5000 diff --git a/ee/spec/services/click_house/data_ingestion/ci_finished_builds_sync_service_spec.rb b/ee/spec/services/click_house/data_ingestion/ci_finished_builds_sync_service_spec.rb index 34da91344683f3..89621da850d477 100644 --- a/ee/spec/services/click_house/data_ingestion/ci_finished_builds_sync_service_spec.rb +++ b/ee/spec/services/click_house/data_ingestion/ci_finished_builds_sync_service_spec.rb @@ -214,7 +214,7 @@ before do lock_name = "#{described_class.name.underscore}/worker/2" - allow(service).to receive(:in_lock).with(lock_name, retries: 0, ttl: 150) + allow(service).to receive(:in_lock).with(lock_name, retries: 0, ttl: 195) .and_raise(Gitlab::ExclusiveLeaseHelpers::FailedToObtainLockError) end -- GitLab From 83f8f96d66e4e58a5922091a3bb5135d94746da9 Mon Sep 17 00:00:00 2001 From: Pedro Pombeiro Date: Tue, 26 Sep 2023 10:50:44 +0200 Subject: [PATCH 04/10] Address MR review comment --- .../data_ingestion/ci_finished_builds_sync_service.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ee/app/services/click_house/data_ingestion/ci_finished_builds_sync_service.rb b/ee/app/services/click_house/data_ingestion/ci_finished_builds_sync_service.rb index 7ca57877ecbc64..770f316de9e097 100644 --- a/ee/app/services/click_house/data_ingestion/ci_finished_builds_sync_service.rb +++ b/ee/app/services/click_house/data_ingestion/ci_finished_builds_sync_service.rb @@ -117,7 +117,7 @@ def process_batch Ci::FinishedBuildChSyncEvent.primary_key_in(build_ids).update_all(processed: true) - @more_records = total_records_yielded == BUILDS_BATCH_SIZE + @more_records = build_ids.count == BUILDS_BATCH_SIZE @total_record_count += total_records_yielded break unless continue? && total_records_yielded < INSERT_BATCH_SIZE end -- GitLab From 0f130c63edb5c7a9bf9eea71c40c673a8aeec739 Mon Sep 17 00:00:00 2001 From: Pedro Pombeiro Date: Tue, 26 Sep 2023 14:34:11 +0200 Subject: [PATCH 05/10] Remove use of transaction --- .../ci_finished_builds_sync_service.rb | 20 ++++++++++--------- .../ci_finished_builds_sync_service_spec.rb | 6 +++--- 2 files changed, 14 insertions(+), 12 deletions(-) diff --git a/ee/app/services/click_house/data_ingestion/ci_finished_builds_sync_service.rb b/ee/app/services/click_house/data_ingestion/ci_finished_builds_sync_service.rb index 770f316de9e097..8e9e79af00cc89 100644 --- a/ee/app/services/click_house/data_ingestion/ci_finished_builds_sync_service.rb +++ b/ee/app/services/click_house/data_ingestion/ci_finished_builds_sync_service.rb @@ -6,9 +6,9 @@ class CiFinishedBuildsSyncService include Gitlab::ExclusiveLeaseHelpers include Gitlab::Utils::StrongMemoize - # the job is scheduled every 4 minutes and we will allow maximum 3.25 minutes runtime - # (2 minutes + 15 seconds PG timeout + 1 minute for the various CH Gitlab::HTTP timeouts) - MAX_TTL = 3.25.minutes.to_i + # the job is scheduled every 3 minutes and we will allow maximum 6 minutes runtime + # we must allow a minimum of 2 minutes + 15 seconds PG timeout + 1 minute for the various CH Gitlab::HTTP timeouts + MAX_TTL = 6.minutes.to_i MAX_RUNTIME = 120.seconds BUILDS_BATCH_SIZE = 500 INSERT_BATCH_SIZE = 5000 @@ -47,12 +47,15 @@ def execute def insert_new_finished_builds loop do - Ci::FinishedBuildChSyncEvent.transaction do - CsvBuilder::Gzip.new(process_batch, CSV_MAPPING).render do |tempfile| - ClickHouse::Client.insert_csv(INSERT_FINISHED_BUILDS_QUERY, File.open(tempfile.path), :main) - end + @processed_record_ids = [] + + CsvBuilder::Gzip.new(process_batch, CSV_MAPPING).render do |tempfile| + ClickHouse::Client.insert_csv(INSERT_FINISHED_BUILDS_QUERY, File.open(tempfile.path), :main) end + Ci::FinishedBuildChSyncEvent.primary_key_in(@processed_record_ids).update_all(processed: true) + @processed_record_ids = [] + unless continue? return { status: :processed, @@ -112,11 +115,10 @@ def process_batch .select(:finished_at, *finished_build_projections) .each do |build| yielder << build + @processed_record_ids << build.id total_records_yielded += 1 end - Ci::FinishedBuildChSyncEvent.primary_key_in(build_ids).update_all(processed: true) - @more_records = build_ids.count == BUILDS_BATCH_SIZE @total_record_count += total_records_yielded break unless continue? && total_records_yielded < INSERT_BATCH_SIZE diff --git a/ee/spec/services/click_house/data_ingestion/ci_finished_builds_sync_service_spec.rb b/ee/spec/services/click_house/data_ingestion/ci_finished_builds_sync_service_spec.rb index 89621da850d477..bde6c8077f9c9b 100644 --- a/ee/spec/services/click_house/data_ingestion/ci_finished_builds_sync_service_spec.rb +++ b/ee/spec/services/click_house/data_ingestion/ci_finished_builds_sync_service_spec.rb @@ -98,9 +98,9 @@ it 'processes the builds' do expect(ClickHouse::Client).to receive(:insert_csv).once.and_call_original - expect { execute }.to change { ci_finished_builds_row_count }.by(2) + expect { execute }.to change { ci_finished_builds_row_count }.by(3) expect(execute).to have_attributes({ payload: { - reached_end_of_table: true, records_inserted: 2, status: :processed + reached_end_of_table: true, records_inserted: 3, status: :processed } }) end end @@ -214,7 +214,7 @@ before do lock_name = "#{described_class.name.underscore}/worker/2" - allow(service).to receive(:in_lock).with(lock_name, retries: 0, ttl: 195) + allow(service).to receive(:in_lock).with(lock_name, retries: 0, ttl: 360) .and_raise(Gitlab::ExclusiveLeaseHelpers::FailedToObtainLockError) end -- GitLab From 89bdbcfc006ddb04ca457f668d28459309ed4b6b Mon Sep 17 00:00:00 2001 From: Pedro Pombeiro Date: Wed, 27 Sep 2023 17:53:58 +0200 Subject: [PATCH 06/10] Fix bug causing insert_csv to be called with all the data --- .../data_ingestion/ci_finished_builds_sync_service.rb | 7 ++++--- .../data_ingestion/ci_finished_builds_sync_service_spec.rb | 2 +- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/ee/app/services/click_house/data_ingestion/ci_finished_builds_sync_service.rb b/ee/app/services/click_house/data_ingestion/ci_finished_builds_sync_service.rb index 8e9e79af00cc89..0db526eb9a775d 100644 --- a/ee/app/services/click_house/data_ingestion/ci_finished_builds_sync_service.rb +++ b/ee/app/services/click_house/data_ingestion/ci_finished_builds_sync_service.rb @@ -106,9 +106,9 @@ def process_batch Enumerator.new do |yielder| @more_records = false - keyset_iterator_scope.each_batch(of: BUILDS_BATCH_SIZE) do |events_batch| - total_records_yielded = 0 + total_records_yielded = 0 + keyset_iterator_scope.each_batch(of: BUILDS_BATCH_SIZE) do |events_batch| build_ids = events_batch.pluck(:build_id) # rubocop: disable CodeReuse/ActiveRecord Ci::Build.id_in(build_ids) .left_outer_joins(:runner, :runner_manager) @@ -120,9 +120,10 @@ def process_batch end @more_records = build_ids.count == BUILDS_BATCH_SIZE - @total_record_count += total_records_yielded break unless continue? && total_records_yielded < INSERT_BATCH_SIZE end + + @total_record_count += @processed_record_ids.count end end diff --git a/ee/spec/services/click_house/data_ingestion/ci_finished_builds_sync_service_spec.rb b/ee/spec/services/click_house/data_ingestion/ci_finished_builds_sync_service_spec.rb index bde6c8077f9c9b..99de506cd76904 100644 --- a/ee/spec/services/click_house/data_ingestion/ci_finished_builds_sync_service_spec.rb +++ b/ee/spec/services/click_house/data_ingestion/ci_finished_builds_sync_service_spec.rb @@ -107,7 +107,7 @@ context 'when multiple CSV uploads are required' do before do - stub_const("#{described_class}::BUILDS_BATCH_SIZE", 2) + stub_const("#{described_class}::BUILDS_BATCH_SIZE", 1) stub_const("#{described_class}::INSERT_BATCH_SIZE", 2) end -- GitLab From b43bec679649eb121e59fa52fa66a47d911406ee Mon Sep 17 00:00:00 2001 From: Pedro Pombeiro Date: Wed, 27 Sep 2023 18:37:20 +0200 Subject: [PATCH 07/10] Close file object --- .../data_ingestion/ci_finished_builds_sync_service.rb | 4 +++- .../data_ingestion/ci_finished_builds_sync_service_spec.rb | 4 ++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/ee/app/services/click_house/data_ingestion/ci_finished_builds_sync_service.rb b/ee/app/services/click_house/data_ingestion/ci_finished_builds_sync_service.rb index 0db526eb9a775d..a2108c04f8d89a 100644 --- a/ee/app/services/click_house/data_ingestion/ci_finished_builds_sync_service.rb +++ b/ee/app/services/click_house/data_ingestion/ci_finished_builds_sync_service.rb @@ -50,7 +50,9 @@ def insert_new_finished_builds @processed_record_ids = [] CsvBuilder::Gzip.new(process_batch, CSV_MAPPING).render do |tempfile| - ClickHouse::Client.insert_csv(INSERT_FINISHED_BUILDS_QUERY, File.open(tempfile.path), :main) + File.open(tempfile.path) do |f| + ClickHouse::Client.insert_csv(INSERT_FINISHED_BUILDS_QUERY, f, :main) + end end Ci::FinishedBuildChSyncEvent.primary_key_in(@processed_record_ids).update_all(processed: true) diff --git a/ee/spec/services/click_house/data_ingestion/ci_finished_builds_sync_service_spec.rb b/ee/spec/services/click_house/data_ingestion/ci_finished_builds_sync_service_spec.rb index 99de506cd76904..0d01efbe876340 100644 --- a/ee/spec/services/click_house/data_ingestion/ci_finished_builds_sync_service_spec.rb +++ b/ee/spec/services/click_house/data_ingestion/ci_finished_builds_sync_service_spec.rb @@ -127,9 +127,9 @@ expect(limiter).to receive(:over_time?).at_least(1).and_return(true) end - expect { execute }.to change { ci_finished_builds_row_count }.by(2) + expect { execute }.to change { ci_finished_builds_row_count }.by(described_class::BUILDS_BATCH_SIZE) expect(execute).to have_attributes({ payload: { - reached_end_of_table: false, records_inserted: 2, status: :processed + reached_end_of_table: false, records_inserted: described_class::BUILDS_BATCH_SIZE, status: :processed } }) end end -- GitLab From 6f17c8b0dc44d182131f3a47c2f2d6156fed6f40 Mon Sep 17 00:00:00 2001 From: Pedro Pombeiro Date: Wed, 27 Sep 2023 22:39:07 +0200 Subject: [PATCH 08/10] Revert unneeded change --- .../ci/finished_build_ch_sync_event_spec.rb | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/ee/spec/models/ci/finished_build_ch_sync_event_spec.rb b/ee/spec/models/ci/finished_build_ch_sync_event_spec.rb index 8c6d006d83c98e..5071719242d720 100644 --- a/ee/spec/models/ci/finished_build_ch_sync_event_spec.rb +++ b/ee/spec/models/ci/finished_build_ch_sync_event_spec.rb @@ -10,18 +10,14 @@ it { is_expected.to validate_presence_of(:build_finished_at) } end - describe 'scopes' do - let_it_be(:event1) { described_class.create!(build_id: 1, build_finished_at: 2.hours.ago, processed: true) } - let_it_be(:event2) { described_class.create!(build_id: 32, build_finished_at: 1.hour.ago) } - let_it_be(:event3) { described_class.create!(build_id: 133, build_finished_at: 1.hour.ago) } - let_it_be(:event4) { described_class.create!(build_id: 266, build_finished_at: 1.hour.ago, processed: true) } - let_it_be(:event5) { described_class.create!(build_id: 367, build_finished_at: 1.hour.ago, processed: true) } + describe '.pending' do + subject(:scope) { described_class.pending } - describe '.pending' do - subject(:scope) { described_class.pending } + let_it_be(:event1) { described_class.create!(build_id: 1, build_finished_at: 2.hours.ago, processed: true) } + let_it_be(:event2) { described_class.create!(build_id: 2, build_finished_at: 1.hour.ago) } + let_it_be(:event3) { described_class.create!(build_id: 3, build_finished_at: 1.hour.ago, processed: true) } - it { is_expected.to contain_exactly(event2, event3) } - end + it { is_expected.to contain_exactly(event2) } end describe '.for_partition', :freeze_time do -- GitLab From c21bb650a641d44771d7b4ed477e567497638ead Mon Sep 17 00:00:00 2001 From: Pedro Pombeiro Date: Wed, 27 Sep 2023 22:53:58 +0200 Subject: [PATCH 09/10] Improve ServiceResponse usage --- .../ci_finished_builds_sync_service.rb | 14 ++++---- .../ci_finished_builds_sync_service_spec.rb | 32 +++++++++---------- 2 files changed, 24 insertions(+), 22 deletions(-) diff --git a/ee/app/services/click_house/data_ingestion/ci_finished_builds_sync_service.rb b/ee/app/services/click_house/data_ingestion/ci_finished_builds_sync_service.rb index a2108c04f8d89a..de5e3e983e0428 100644 --- a/ee/app/services/click_house/data_ingestion/ci_finished_builds_sync_service.rb +++ b/ee/app/services/click_house/data_ingestion/ci_finished_builds_sync_service.rb @@ -22,14 +22,17 @@ def initialize(worker_index: 0, total_workers: 1) def execute @total_record_count = 0 - return ServiceResponse.success(payload: { status: :disabled }) unless enabled? - - unless ClickHouse::Client.configuration.databases[:main].present? + unless enabled? return ServiceResponse.error( - message: 'ClickHouse database is not configured', payload: { status: :db_not_configured } + message: 'Feature ci_data_ingestion_to_click_house is disabled', + reason: :disabled ) end + unless ClickHouse::Client.configuration.databases[:main].present? + return ServiceResponse.error(message: 'ClickHouse database is not configured', reason: :db_not_configured) + end + # Prevent parallel jobs in_lock("#{self.class.name.underscore}/worker/#{@worker_index}", ttl: MAX_TTL, retries: 0) do ::Gitlab::Database::LoadBalancing::Session.without_sticky_writes do @@ -40,7 +43,7 @@ def execute end rescue Gitlab::ExclusiveLeaseHelpers::FailedToObtainLockError => e # Skip retrying, just let the next worker to start after a few minutes - ServiceResponse.error(message: e.message, payload: { status: :skipped }) + ServiceResponse.error(message: e.message, reason: :skipped) end private @@ -60,7 +63,6 @@ def insert_new_finished_builds unless continue? return { - status: :processed, records_inserted: @total_record_count, reached_end_of_table: !@more_records } diff --git a/ee/spec/services/click_house/data_ingestion/ci_finished_builds_sync_service_spec.rb b/ee/spec/services/click_house/data_ingestion/ci_finished_builds_sync_service_spec.rb index 0d01efbe876340..280f0030c6cdc0 100644 --- a/ee/spec/services/click_house/data_ingestion/ci_finished_builds_sync_service_spec.rb +++ b/ee/spec/services/click_house/data_ingestion/ci_finished_builds_sync_service_spec.rb @@ -47,9 +47,7 @@ expect(ClickHouse::Client).to receive(:insert_csv).once.and_call_original expect { execute }.to change { ci_finished_builds_row_count }.by(3) - expect(execute).to have_attributes({ payload: { - reached_end_of_table: true, records_inserted: 3, status: :processed - } }) + expect(execute).to have_attributes({ payload: { reached_end_of_table: true, records_inserted: 3 } }) records_by_id = ci_finished_builds_by_id expect(records_by_id.count).to eq 3 @@ -65,9 +63,7 @@ started_at: 30.minutes.ago, finished_at: 1.minute.ago) expect { execute }.to change { ci_finished_builds_row_count }.by(3) - expect(execute).to have_attributes({ payload: { - reached_end_of_table: true, records_inserted: 3, status: :processed - } }) + expect(execute).to have_attributes({ payload: { reached_end_of_table: true, records_inserted: 3 } }) create_sync_events(build) expect { service.execute }.to change { ci_finished_builds_row_count }.by(1) @@ -99,9 +95,7 @@ expect(ClickHouse::Client).to receive(:insert_csv).once.and_call_original expect { execute }.to change { ci_finished_builds_row_count }.by(3) - expect(execute).to have_attributes({ payload: { - reached_end_of_table: true, records_inserted: 3, status: :processed - } }) + expect(execute).to have_attributes({ payload: { reached_end_of_table: true, records_inserted: 3 } }) end end @@ -115,9 +109,7 @@ expect(ClickHouse::Client).to receive(:insert_csv).twice.and_call_original expect { execute }.to change { ci_finished_builds_row_count }.by(3) - expect(execute).to have_attributes({ payload: { - reached_end_of_table: true, records_inserted: 3, status: :processed - } }) + expect(execute).to have_attributes({ payload: { reached_end_of_table: true, records_inserted: 3 } }) end context 'with time limit being reached' do @@ -129,7 +121,7 @@ expect { execute }.to change { ci_finished_builds_row_count }.by(described_class::BUILDS_BATCH_SIZE) expect(execute).to have_attributes({ payload: { - reached_end_of_table: false, records_inserted: described_class::BUILDS_BATCH_SIZE, status: :processed + reached_end_of_table: false, records_inserted: described_class::BUILDS_BATCH_SIZE } }) end end @@ -204,7 +196,11 @@ end it 'skips execution' do - is_expected.to have_attributes({ payload: { status: :db_not_configured } }) + is_expected.to have_attributes({ + status: :error, + message: 'ClickHouse database is not configured', + reason: :db_not_configured + }) end end @@ -221,7 +217,7 @@ it 'does nothing' do expect { execute }.not_to change { ci_finished_builds_row_count } - expect(execute).to have_attributes({ status: :error, payload: { status: :skipped } }) + expect(execute).to have_attributes({ status: :error, reason: :skipped }) end end end @@ -233,7 +229,11 @@ end it 'skips execution' do - is_expected.to have_attributes({ payload: { status: :disabled } }) + is_expected.to have_attributes({ + status: :error, + message: 'Feature ci_data_ingestion_to_click_house is disabled', + reason: :disabled + }) end end -- GitLab From b8820729f5d2a4f7a2af2699fdfe67b5c900e1ed Mon Sep 17 00:00:00 2001 From: Pedro Pombeiro Date: Wed, 27 Sep 2023 23:25:18 +0200 Subject: [PATCH 10/10] Address MR review comments --- .../models/ci/finished_build_ch_sync_event.rb | 2 + .../ci_finished_builds_sync_service.rb | 26 +++-- .../ci/finished_build_ch_sync_event_spec.rb | 34 +++++-- .../ci_finished_builds_sync_service_spec.rb | 99 ++++++++----------- 4 files changed, 77 insertions(+), 84 deletions(-) diff --git a/ee/app/models/ci/finished_build_ch_sync_event.rb b/ee/app/models/ci/finished_build_ch_sync_event.rb index 64f9bb1cdc75c9..a2cc0e40473a93 100644 --- a/ee/app/models/ci/finished_build_ch_sync_event.rb +++ b/ee/app/models/ci/finished_build_ch_sync_event.rb @@ -26,6 +26,8 @@ class FinishedBuildChSyncEvent < Ci::ApplicationRecord validates :build_id, presence: true validates :build_finished_at, presence: true + scope :order_by_build_id, -> { order(:build_id) } + scope :pending, -> { where(processed: false) } scope :for_partition, ->(partition) { where(partition: partition) } end diff --git a/ee/app/services/click_house/data_ingestion/ci_finished_builds_sync_service.rb b/ee/app/services/click_house/data_ingestion/ci_finished_builds_sync_service.rb index de5e3e983e0428..29cc888ca67fc9 100644 --- a/ee/app/services/click_house/data_ingestion/ci_finished_builds_sync_service.rb +++ b/ee/app/services/click_house/data_ingestion/ci_finished_builds_sync_service.rb @@ -12,6 +12,7 @@ class CiFinishedBuildsSyncService MAX_RUNTIME = 120.seconds BUILDS_BATCH_SIZE = 500 INSERT_BATCH_SIZE = 5000 + BUILD_ID_PARTITIONS = 100 def initialize(worker_index: 0, total_workers: 1) @runtime_limiter = Analytics::CycleAnalytics::RuntimeLimiter.new(MAX_RUNTIME) @@ -77,9 +78,9 @@ def insert_new_finished_builds CSV_MAPPING = { **BUILD_FIELD_NAMES.index_with { |n| n }, - **BUILD_EPOCH_FIELD_NAMES.index_with { |n| "casted_#{n}".to_sym }, - **RUNNER_FIELD_NAMES.map { |n| "runner_#{n}".to_sym }.index_with { |n| n }, - **RUNNER_MANAGER_FIELD_NAMES.map { |n| "runner_manager_#{n}".to_sym }.index_with { |n| n } + **BUILD_EPOCH_FIELD_NAMES.index_with { |n| :"casted_#{n}" }, + **RUNNER_FIELD_NAMES.map { |n| :"runner_#{n}" }.index_with { |n| n }, + **RUNNER_MANAGER_FIELD_NAMES.map { |n| :"runner_manager_#{n}" }.index_with { |n| n } }.freeze INSERT_FINISHED_BUILDS_QUERY = <<~SQL.squish @@ -132,28 +133,25 @@ def process_batch end def keyset_iterator_scope - lower_bound = (@worker_index * 100 / @total_workers).to_i - upper_bound = ((@worker_index + 1) * 100 / @total_workers).to_i + lower_bound = (@worker_index * BUILD_ID_PARTITIONS / @total_workers).to_i + upper_bound = ((@worker_index + 1) * BUILD_ID_PARTITIONS / @total_workers).to_i table_name = Ci::FinishedBuildChSyncEvent.quoted_table_name - # rubocop: disable CodeReuse/ActiveRecord - array_scope = Ci::FinishedBuildChSyncEvent - .select(:build_id_partition) - .from("generate_series(#{lower_bound}, #{upper_bound}) as #{table_name}(build_id_partition)") - - scope = Ci::FinishedBuildChSyncEvent.pending.order(:build_id) - # rubocop: enable CodeReuse/ActiveRecord + array_scope = Ci::FinishedBuildChSyncEvent.select(:build_id_partition) + .from("generate_series(#{lower_bound}, #{upper_bound}) as #{table_name}(build_id_partition)") # rubocop: disable CodeReuse/ActiveRecord opts = { in_operator_optimization_options: { array_scope: array_scope, array_mapping_scope: ->(id_expression) do - Ci::FinishedBuildChSyncEvent.where(Arel.sql('(build_id % 100)').eq(id_expression)) # rubocop: disable CodeReuse/ActiveRecord + Ci::FinishedBuildChSyncEvent + .where(Arel.sql("(build_id % #{BUILD_ID_PARTITIONS})") # rubocop: disable CodeReuse/ActiveRecord + .eq(id_expression)) end } } - Gitlab::Pagination::Keyset::Iterator.new(scope: scope, **opts) + Gitlab::Pagination::Keyset::Iterator.new(scope: Ci::FinishedBuildChSyncEvent.pending.order_by_build_id, **opts) end end end diff --git a/ee/spec/models/ci/finished_build_ch_sync_event_spec.rb b/ee/spec/models/ci/finished_build_ch_sync_event_spec.rb index 5071719242d720..ea7e7ab2305ba3 100644 --- a/ee/spec/models/ci/finished_build_ch_sync_event_spec.rb +++ b/ee/spec/models/ci/finished_build_ch_sync_event_spec.rb @@ -10,16 +10,6 @@ it { is_expected.to validate_presence_of(:build_finished_at) } end - describe '.pending' do - subject(:scope) { described_class.pending } - - let_it_be(:event1) { described_class.create!(build_id: 1, build_finished_at: 2.hours.ago, processed: true) } - let_it_be(:event2) { described_class.create!(build_id: 2, build_finished_at: 1.hour.ago) } - let_it_be(:event3) { described_class.create!(build_id: 3, build_finished_at: 1.hour.ago, processed: true) } - - it { is_expected.to contain_exactly(event2) } - end - describe '.for_partition', :freeze_time do subject(:scope) { described_class.for_partition(partition) } @@ -154,4 +144,28 @@ end end end + + describe 'sorting' do + let_it_be(:event3) { described_class.create!(build_id: 3, build_finished_at: 2.hours.ago, processed: true) } + let_it_be(:event1) { described_class.create!(build_id: 1, build_finished_at: 1.hour.ago) } + let_it_be(:event2) { described_class.create!(build_id: 2, build_finished_at: 1.hour.ago, processed: true) } + + describe '.order_by_build_id' do + subject(:scope) { described_class.order_by_build_id } + + it { is_expected.to eq([event1, event2, event3]) } + end + end + + describe 'scopes' do + let_it_be(:event1) { described_class.create!(build_id: 1, build_finished_at: 2.hours.ago, processed: true) } + let_it_be(:event2) { described_class.create!(build_id: 2, build_finished_at: 1.hour.ago) } + let_it_be(:event3) { described_class.create!(build_id: 3, build_finished_at: 1.hour.ago, processed: true) } + + describe '.pending' do + subject(:scope) { described_class.pending } + + it { is_expected.to contain_exactly(event2) } + end + end end diff --git a/ee/spec/services/click_house/data_ingestion/ci_finished_builds_sync_service_spec.rb b/ee/spec/services/click_house/data_ingestion/ci_finished_builds_sync_service_spec.rb index 280f0030c6cdc0..b61e82bab1a3f6 100644 --- a/ee/spec/services/click_house/data_ingestion/ci_finished_builds_sync_service_spec.rb +++ b/ee/spec/services/click_house/data_ingestion/ci_finished_builds_sync_service_spec.rb @@ -14,26 +14,12 @@ architecture: 'amd64') end - let_it_be(:build1) do - create(:ci_build, :success, runner_manager: runner_manager1, created_at: 1.hour.ago, - queued_at: 59.minutes.ago, started_at: 57.minutes.ago, finished_at: 30.minutes.ago) - end - - let_it_be(:build2) do - create(:ci_build, :canceled, created_at: 30.minutes.ago, queued_at: 30.minutes.ago, - started_at: 29.minutes.ago, finished_at: 15.minutes.ago) - end - - let_it_be(:build3) do - create(:ci_build, :failed, created_at: 30.minutes.ago, queued_at: 30.minutes.ago, - started_at: 29.minutes.ago, finished_at: 15.minutes.ago) - end - - let_it_be(:build4) do - create(:ci_build, :pending) - end + let_it_be(:build1) { create(:ci_build, :success, runner_manager: runner_manager1) } + let_it_be(:build2) { create(:ci_build, :canceled) } + let_it_be(:build3) { create(:ci_build, :failed) } + let_it_be(:build4) { create(:ci_build, :pending) } - before do + before_all do create_sync_events(*Ci::Build.finished) end @@ -49,18 +35,17 @@ expect { execute }.to change { ci_finished_builds_row_count }.by(3) expect(execute).to have_attributes({ payload: { reached_end_of_table: true, records_inserted: 3 } }) - records_by_id = ci_finished_builds_by_id - expect(records_by_id.count).to eq 3 - expect(records_by_id).to match( - build1.id => a_hash_including(**expected_build_attributes(build1)), - build2.id => a_hash_including(**expected_build_attributes(build2)), - build3.id => a_hash_including(**expected_build_attributes(build3)) + records = ci_finished_builds + expect(records.count).to eq 3 + expect(records).to contain_exactly( + a_hash_including(**expected_build_attributes(build1)), + a_hash_including(**expected_build_attributes(build2)), + a_hash_including(**expected_build_attributes(build3)) ) end it 'processes only builds from Ci::FinishedBuildChSyncEvent' do - build = create(:ci_build, :failed, created_at: 30.minutes.ago, queued_at: 30.minutes.ago, - started_at: 30.minutes.ago, finished_at: 1.minute.ago) + build = create(:ci_build, :failed) expect { execute }.to change { ci_finished_builds_row_count }.by(3) expect(execute).to have_attributes({ payload: { reached_end_of_table: true, records_inserted: 3 } }) @@ -71,16 +56,15 @@ context 'when a finished build has nil finished_at value' do it 'skips the build' do - create(:ci_build, :failed, created_at: 30.minutes.ago, queued_at: 30.minutes.ago, - started_at: 30.minutes.ago, finished_at: nil) + create(:ci_build, :failed, finished_at: nil) expect { execute }.to change { ci_finished_builds_row_count }.by(3) - records_by_id = ci_finished_builds_by_id - expect(records_by_id.count).to eq 3 - expect(records_by_id).to match( - build1.id => a_hash_including(**expected_build_attributes(build1)), - build2.id => a_hash_including(**expected_build_attributes(build2)), - build3.id => a_hash_including(**expected_build_attributes(build3)) + records = ci_finished_builds + expect(records.count).to eq 3 + expect(records).to contain_exactly( + a_hash_including(**expected_build_attributes(build1)), + a_hash_including(**expected_build_attributes(build2)), + a_hash_including(**expected_build_attributes(build3)) ) end end @@ -140,18 +124,17 @@ it 'processes the builds' do expect { service.execute }.to change { ci_finished_builds_row_count }.by(3) - build4 = create(:ci_build, :failed, created_at: 3.minutes.ago, queued_at: 3.minutes.ago, - started_at: 2.minutes.ago, finished_at: 1.minute.ago) + build4 = create(:ci_build, :failed) create_sync_events(build4) expect { service.execute }.to change { ci_finished_builds_row_count }.by(1) - records_by_id = ci_finished_builds_by_id - expect(records_by_id.count).to eq 4 - expect(records_by_id).to match( - build1.id => a_hash_including(**expected_build_attributes(build1)), - build2.id => a_hash_including(**expected_build_attributes(build2)), - build3.id => a_hash_including(**expected_build_attributes(build3)), - build4.id => a_hash_including(**expected_build_attributes(build4)) + records = ci_finished_builds + expect(records.count).to eq 4 + expect(records).to contain_exactly( + a_hash_including(**expected_build_attributes(build1)), + a_hash_including(**expected_build_attributes(build2)), + a_hash_including(**expected_build_attributes(build3)), + a_hash_including(**expected_build_attributes(build4)) ) end @@ -159,21 +142,19 @@ it 'processes the builds' do expect { service.execute }.to change { ci_finished_builds_row_count }.by(3) - build4 = create(:ci_build, :failed, created_at: 3.minutes.ago, queued_at: 3.minutes.ago, - started_at: 2.minutes.ago, finished_at: 1.minute.ago) - build5 = create(:ci_build, :failed, created_at: 2.minutes.ago, queued_at: 2.minutes.ago, - started_at: 2.minutes.ago, finished_at: 1.minute.ago) + build4 = create(:ci_build, :failed) + build5 = create(:ci_build, :failed) create_sync_events(build4, build5) expect { execute }.to change { ci_finished_builds_row_count }.by(2) - records_by_id = ci_finished_builds_by_id - expect(records_by_id.count).to eq 5 - expect(records_by_id).to match( - build1.id => a_hash_including(**expected_build_attributes(build1)), - build2.id => a_hash_including(**expected_build_attributes(build2)), - build3.id => a_hash_including(**expected_build_attributes(build3)), - build4.id => a_hash_including(**expected_build_attributes(build4)), - build5.id => a_hash_including(**expected_build_attributes(build5)) + records = ci_finished_builds + expect(records.count).to eq 5 + expect(records).to contain_exactly( + a_hash_including(**expected_build_attributes(build1)), + a_hash_including(**expected_build_attributes(build2)), + a_hash_including(**expected_build_attributes(build3)), + a_hash_including(**expected_build_attributes(build4)), + a_hash_including(**expected_build_attributes(build5)) ) end end @@ -182,8 +163,7 @@ it 'does not process the build' do expect { service.execute }.to change { ci_finished_builds_row_count }.by(3) - create(:ci_build, :failed, created_at: 20.minutes.ago, queued_at: 20.minutes.ago, - started_at: 20.minutes.ago, finished_at: 19.minutes.ago) + create(:ci_build, :failed) expect { service.execute }.not_to change { ci_finished_builds_row_count } end @@ -247,11 +227,10 @@ def ci_finished_builds_row_count ClickHouse::Client.select('SELECT COUNT(*) AS count FROM ci_finished_builds', :main).first['count'] end - def ci_finished_builds_by_id + def ci_finished_builds ClickHouse::Client .select('SELECT * FROM ci_finished_builds', :main) .map(&:symbolize_keys) - .index_by { |r| r[:id] } end def expected_build_attributes(build) -- GitLab