From 52e8c572a423fa4c652a98290619c24a405e5617 Mon Sep 17 00:00:00 2001 From: Pedro Pombeiro Date: Mon, 11 Sep 2023 19:28:19 +0200 Subject: [PATCH] Introduce worker to import finished builds to ClickHouse EE: true --- config/initializers/1_settings.rb | 4 ++ ee/app/workers/all_queues.yml | 9 +++ .../ci_finished_builds_sync_cron_worker.rb | 22 ++++++ ...i_finished_builds_sync_cron_worker_spec.rb | 72 +++++++++++++++++++ 4 files changed, 107 insertions(+) create mode 100644 ee/app/workers/click_house/ci_finished_builds_sync_cron_worker.rb create mode 100644 ee/spec/workers/click_house/ci_finished_builds_sync_cron_worker_spec.rb diff --git a/config/initializers/1_settings.rb b/config/initializers/1_settings.rb index cf3084f7f9d639..5c45e5a457b25b 100644 --- a/config/initializers/1_settings.rb +++ b/config/initializers/1_settings.rb @@ -894,6 +894,10 @@ Settings.cron_jobs['click_house_events_sync_worker'] ||= {} Settings.cron_jobs['click_house_events_sync_worker']['cron'] ||= "*/3 * * * *" Settings.cron_jobs['click_house_events_sync_worker']['job_class'] = 'ClickHouse::EventsSyncWorker' + Settings.cron_jobs['click_house_ci_finished_builds_sync_worker'] ||= {} + Settings.cron_jobs['click_house_ci_finished_builds_sync_worker']['cron'] ||= '*/3 * * * *' + Settings.cron_jobs['click_house_ci_finished_builds_sync_worker']['args'] ||= [0, 1] + Settings.cron_jobs['click_house_ci_finished_builds_sync_worker']['job_class'] = 'ClickHouse::CiFinishedBuildsSyncCronWorker' end end diff --git a/ee/app/workers/all_queues.yml b/ee/app/workers/all_queues.yml index e70597830f0bc5..ef7c59be5b35d7 100644 --- a/ee/app/workers/all_queues.yml +++ b/ee/app/workers/all_queues.yml @@ -111,6 +111,15 @@ :weight: 1 :idempotent: true :tags: [] +- :name: cronjob:click_house_ci_finished_builds_sync_cron + :worker_name: ClickHouse::CiFinishedBuildsSyncCronWorker + :feature_category: :runner_fleet + :has_external_dependencies: true + :urgency: :low + :resource_boundary: :unknown + :weight: 1 + :idempotent: true + :tags: [] - :name: cronjob:compliance_management_merge_requests_compliance_violations_consistency :worker_name: ComplianceManagement::MergeRequests::ComplianceViolationsConsistencyWorker :feature_category: :compliance_management diff --git a/ee/app/workers/click_house/ci_finished_builds_sync_cron_worker.rb b/ee/app/workers/click_house/ci_finished_builds_sync_cron_worker.rb new file mode 100644 index 00000000000000..404d8b792faba4 --- /dev/null +++ b/ee/app/workers/click_house/ci_finished_builds_sync_cron_worker.rb @@ -0,0 +1,22 @@ +# frozen_string_literal: true + +module ClickHouse + class CiFinishedBuildsSyncCronWorker + include ApplicationWorker + + idempotent! + queue_namespace :cronjob + data_consistency :delayed + worker_has_external_dependencies! # the worker interacts with a ClickHouse database + feature_category :runner_fleet + + def perform(worker_index = 0, total_workers = 1) + response = ::ClickHouse::DataIngestion::CiFinishedBuildsSyncService.new( + worker_index: worker_index, total_workers: total_workers + ).execute + + result = response.success? ? response.payload : response.deconstruct_keys(%i[message reason]) + log_extra_metadata_on_done(:result, result) + end + end +end diff --git a/ee/spec/workers/click_house/ci_finished_builds_sync_cron_worker_spec.rb b/ee/spec/workers/click_house/ci_finished_builds_sync_cron_worker_spec.rb new file mode 100644 index 00000000000000..7807479235c1b0 --- /dev/null +++ b/ee/spec/workers/click_house/ci_finished_builds_sync_cron_worker_spec.rb @@ -0,0 +1,72 @@ +# frozen_string_literal: true + +require 'spec_helper' + +RSpec.describe ClickHouse::CiFinishedBuildsSyncCronWorker, :click_house, :freeze_time, feature_category: :runner_fleet do + let(:worker) { described_class.new } + + let_it_be(:ci_build1) { create(:ci_build, :success) } + let_it_be(:ci_build2) { create(:ci_build, :pending) } + + subject(:perform) { worker.perform } + + before do + create_sync_events ci_build1 + end + + include_examples 'an idempotent worker' do + it 'calls CiFinishedBuildsSyncService and returns its response payload' do + expect(worker).to receive(:log_extra_metadata_on_done) + .with(:result, { reached_end_of_table: true, records_inserted: 1 }) + + params = { worker_index: 0, total_workers: 1 } + expect_next_instance_of(::ClickHouse::DataIngestion::CiFinishedBuildsSyncService, params) do |service| + expect(service).to receive(:execute).and_call_original + end + + expect(ClickHouse::Client).to receive(:insert_csv).once.and_call_original + + expect { perform }.to change { ci_finished_builds_row_count }.by(::Ci::Build.finished.count) + end + + context 'when an error is reported from service' do + before do + allow(ClickHouse::Client.configuration).to receive(:databases).and_return({}) + end + + it 'skips execution' do + expect(worker).to receive(:log_extra_metadata_on_done) + .with(:result, { message: 'ClickHouse database is not configured', reason: :db_not_configured }) + + perform + end + end + end + + context 'with 2 workers' do + subject(:perform) { worker.perform(0, 2) } + + it 'calls CiFinishedBuildsSyncService with correct arguments' do + expect(worker).to receive(:log_extra_metadata_on_done).once + + params = { worker_index: 0, total_workers: 2 } + expect_next_instance_of(::ClickHouse::DataIngestion::CiFinishedBuildsSyncService, params) do |service| + expect(service).to receive(:execute).and_call_original + end + + expect(ClickHouse::Client).to receive(:insert_csv).once.and_call_original + + perform + end + end + + def create_sync_events(*builds) + builds.each do |build| + Ci::FinishedBuildChSyncEvent.new(build_id: build.id, build_finished_at: build.finished_at).save! + end + end + + def ci_finished_builds_row_count + ClickHouse::Client.select('SELECT COUNT(*) AS count FROM ci_finished_builds', :main).first['count'] + end +end -- GitLab