diff --git a/config/initializers/postgres_partitioning.rb b/config/initializers/postgres_partitioning.rb index a7728bf51b32ee677ed1037be2bd477e8e563c32..1cf40059a03590515355ef9aa626f7e91e16bf74 100644 --- a/config/initializers/postgres_partitioning.rb +++ b/config/initializers/postgres_partitioning.rb @@ -19,7 +19,8 @@ IncidentManagement::PendingEscalations::Alert, IncidentManagement::PendingEscalations::Issue, Security::Finding, - Analytics::ValueStreamDashboard::Count + Analytics::ValueStreamDashboard::Count, + Ci::FinishedBuildChSyncEvent ]) else Gitlab::Database::Partitioning.register_tables( diff --git a/db/docs/p_ci_finished_build_ch_sync_events.yml b/db/docs/p_ci_finished_build_ch_sync_events.yml new file mode 100644 index 0000000000000000000000000000000000000000..09938f99b16702049a4a888a6a2d77daad04a6a5 --- /dev/null +++ b/db/docs/p_ci_finished_build_ch_sync_events.yml @@ -0,0 +1,10 @@ +--- +table_name: p_ci_finished_build_ch_sync_events +classes: +- Ci::FinishedBuildChSyncEvent +feature_categories: +- runner_fleet +description: Holds references to finished CI builds ready to be synced to ClickHouse +introduced_by_url: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/ +milestone: '16.5' +gitlab_schema: gitlab_ci diff --git a/db/migrate/20230915103259_create_ci_finished_build_ch_sync_events.rb b/db/migrate/20230915103259_create_ci_finished_build_ch_sync_events.rb new file mode 100644 index 0000000000000000000000000000000000000000..718fd49f5c00936308cb8a3e61b3d56efe9cf31e --- /dev/null +++ b/db/migrate/20230915103259_create_ci_finished_build_ch_sync_events.rb @@ -0,0 +1,26 @@ +# frozen_string_literal: true + +class CreateCiFinishedBuildChSyncEvents < Gitlab::Database::Migration[2.1] + def change + options = { + primary_key: [:build_id, :partition], + options: 'PARTITION BY LIST (partition)' + } + + create_table(:p_ci_finished_build_ch_sync_events, **options) do |t| + # Do not bother with foreign key as it provides not benefit and has a performance cost. These get cleaned up over + # time anyway. + t.bigint :build_id, null: false + t.bigint :partition, null: false, default: 1 + # rubocop: disable Migration/Datetime + # The source for this field does not have a timezone + t.datetime :build_finished_at, null: false + # rubocop: enable Migration/Datetime + t.boolean :processed, null: false, default: false + + t.index '(build_id % 100), build_id', + where: 'processed = FALSE', + name: 'index_ci_finished_build_ch_sync_events_for_partitioned_query' + end + end +end diff --git a/db/schema_migrations/20230915103259 b/db/schema_migrations/20230915103259 new file mode 100644 index 0000000000000000000000000000000000000000..2cbfa061f31eff2c4aafb987a3ab8f8d8f8a470b --- /dev/null +++ b/db/schema_migrations/20230915103259 @@ -0,0 +1 @@ +d3dbc12fcadb285af3e4953addc76352c95bc6db8b20a43524627d8e6ed69b11 \ No newline at end of file diff --git a/db/structure.sql b/db/structure.sql index 768cfda1f68afb1a758cd3094951fb57d39a57f3..ea240c9b389ac8742a65df95c880603375021f84 100644 --- a/db/structure.sql +++ b/db/structure.sql @@ -646,6 +646,14 @@ CREATE TABLE p_batched_git_ref_updates_deletions ( ) PARTITION BY LIST (partition_id); +CREATE TABLE p_ci_finished_build_ch_sync_events ( + build_id bigint NOT NULL, + partition bigint DEFAULT 1 NOT NULL, + build_finished_at timestamp without time zone NOT NULL, + processed boolean DEFAULT false NOT NULL +) +PARTITION BY LIST (partition); + CREATE TABLE projects_visits ( id bigint NOT NULL, entity_id bigint NOT NULL, @@ -28593,6 +28601,9 @@ ALTER TABLE ONLY organizations ALTER TABLE ONLY p_batched_git_ref_updates_deletions ADD CONSTRAINT p_batched_git_ref_updates_deletions_pkey PRIMARY KEY (id, partition_id); +ALTER TABLE ONLY p_ci_finished_build_ch_sync_events + ADD CONSTRAINT p_ci_finished_build_ch_sync_events_pkey PRIMARY KEY (build_id, partition); + ALTER TABLE ONLY p_ci_job_annotations ADD CONSTRAINT p_ci_job_annotations_pkey PRIMARY KEY (id, partition_id); @@ -31340,6 +31351,8 @@ CREATE INDEX index_ci_editor_ai_messages_on_user_project_and_created_at ON ci_ed CREATE INDEX index_ci_editor_ai_messages_project_id ON ci_editor_ai_conversation_messages USING btree (project_id); +CREATE INDEX index_ci_finished_build_ch_sync_events_for_partitioned_query ON ONLY p_ci_finished_build_ch_sync_events USING btree (((build_id % (100)::bigint)), build_id) WHERE (processed = false); + CREATE INDEX index_ci_freeze_periods_on_project_id ON ci_freeze_periods USING btree (project_id); CREATE UNIQUE INDEX index_ci_group_variables_on_group_id_and_key_and_environment ON ci_group_variables USING btree (group_id, key, environment_scope); diff --git a/ee/app/models/ci/finished_build_ch_sync_event.rb b/ee/app/models/ci/finished_build_ch_sync_event.rb new file mode 100644 index 0000000000000000000000000000000000000000..4e73c98a3e1399cc93ca648748731c0708ec18ff --- /dev/null +++ b/ee/app/models/ci/finished_build_ch_sync_event.rb @@ -0,0 +1,30 @@ +# frozen_string_literal: true + +module Ci + class FinishedBuildChSyncEvent < Ci::ApplicationRecord + PARTITION_DURATION = 1.day + + include PartitionedTable + + self.table_name = :p_ci_finished_build_ch_sync_events + self.primary_key = :build_id + self.ignored_columns = %i[partition] # rubocop: disable Cop/IgnoredColumns + + partitioned_by :partition, strategy: :sliding_list, + next_partition_if: ->(active_partition) do + oldest_record_in_partition = FinishedBuildChSyncEvent.for_partition(active_partition.value).first + + oldest_record_in_partition.present? && + oldest_record_in_partition.build_finished_at < PARTITION_DURATION.ago + end, + detach_partition_if: ->(partition) do + !FinishedBuildChSyncEvent.pending.for_partition(partition.value).exists? + end + + validates :build_id, presence: true + validates :build_finished_at, presence: true + + scope :pending, -> { where(processed: false) } + scope :for_partition, ->(partition) { where(partition: partition) } + end +end diff --git a/ee/spec/models/ci/finished_build_ch_sync_event_spec.rb b/ee/spec/models/ci/finished_build_ch_sync_event_spec.rb new file mode 100644 index 0000000000000000000000000000000000000000..5071719242d72050208c1f443142ed61042f8fc7 --- /dev/null +++ b/ee/spec/models/ci/finished_build_ch_sync_event_spec.rb @@ -0,0 +1,157 @@ +# frozen_string_literal: true + +require 'spec_helper' + +RSpec.describe Ci::FinishedBuildChSyncEvent, type: :model, feature_category: :runner_fleet do + describe 'validations' do + subject(:event) { described_class.create!(build_id: 1, build_finished_at: 2.hours.ago) } + + it { is_expected.to validate_presence_of(:build_id) } + it { is_expected.to validate_presence_of(:build_finished_at) } + end + + describe '.pending' do + subject(:scope) { described_class.pending } + + let_it_be(:event1) { described_class.create!(build_id: 1, build_finished_at: 2.hours.ago, processed: true) } + let_it_be(:event2) { described_class.create!(build_id: 2, build_finished_at: 1.hour.ago) } + let_it_be(:event3) { described_class.create!(build_id: 3, build_finished_at: 1.hour.ago, processed: true) } + + it { is_expected.to contain_exactly(event2) } + end + + describe '.for_partition', :freeze_time do + subject(:scope) { described_class.for_partition(partition) } + + let_it_be(:partition_manager) { Gitlab::Database::Partitioning::PartitionManager.new(described_class) } + + around do |example| + Gitlab::Database::SharedModel.using_connection(Ci::ApplicationRecord.connection) do + example.run + end + end + + before do + described_class.create!(build_id: 1, build_finished_at: 2.hours.ago, processed: true) + described_class.create!(build_id: 2, build_finished_at: 1.hour.ago, processed: true) + + travel(described_class::PARTITION_DURATION + 1.second) + + partition_manager.sync_partitions + described_class.create!(build_id: 3, build_finished_at: 1.hour.ago) + end + + context 'when partition = 1' do + let(:partition) { 1 } + + it { is_expected.to match_array(described_class.where(build_id: [1, 2])) } + end + + context 'when partition = 2' do + let(:partition) { 2 } + + it { is_expected.to match_array(described_class.where(build_id: 3)) } + end + end + + describe 'sliding_list partitioning' do + let(:partition_manager) { Gitlab::Database::Partitioning::PartitionManager.new(described_class) } + let(:partitioning_strategy) { described_class.partitioning_strategy } + + around do |example| + Gitlab::Database::SharedModel.using_connection(Ci::ApplicationRecord.connection) do + example.run + end + end + + describe 'next_partition_if callback' do + let(:active_partition) { partitioning_strategy.active_partition } + + subject(:value) { partitioning_strategy.next_partition_if.call(active_partition) } + + context 'when the partition is empty' do + it { is_expected.to eq(false) } + end + + context 'when the partition has records' do + before do + described_class.create!(build_id: 1, build_finished_at: 2.hours.ago, processed: true) + described_class.create!(build_id: 2, build_finished_at: 1.minute.ago) + end + + it { is_expected.to eq(false) } + end + + context 'when the first record of the partition is older than PARTITION_DURATION' do + before do + described_class.create!(build_id: 1, build_finished_at: (described_class::PARTITION_DURATION + 1.day).ago) + described_class.create!(build_id: 2, build_finished_at: 1.minute.ago) + end + + it { is_expected.to eq(true) } + end + end + + describe 'detach_partition_if callback' do + let(:active_partition) { partitioning_strategy.active_partition } + + subject(:value) { partitioning_strategy.detach_partition_if.call(active_partition) } + + context 'when the partition contains unprocessed records' do + before do + described_class.create!(build_id: 1, build_finished_at: 2.hours.ago, processed: true) + described_class.create!(build_id: 2, build_finished_at: 1.minute.ago) + end + + it { is_expected.to eq(false) } + end + + context 'when the partition contains only processed records' do + before do + described_class.create!(build_id: 1, build_finished_at: 2.hours.ago, processed: true) + described_class.create!(build_id: 2, build_finished_at: 1.minute.ago, processed: true) + end + + it { is_expected.to eq(true) } + end + end + + describe 'the behavior of the strategy' do + it 'moves records to new partitions as time passes', :freeze_time do + # We start with partition 1 + expect(partitioning_strategy.current_partitions.map(&:value)).to eq([1]) + + # it's not a day old yet so no new partitions are created + partition_manager.sync_partitions + + expect(partitioning_strategy.current_partitions.map(&:value)).to eq([1]) + + # add one record so the next partition will be created + described_class.create!(build_id: 1, build_finished_at: Time.current) + + # after traveling forward a day + travel(described_class::PARTITION_DURATION + 1.second) + + # a new partition is created + partition_manager.sync_partitions + + expect(partitioning_strategy.current_partitions.map(&:value)).to eq([1, 2]) + + # and we can insert to the new partition + expect { described_class.create!(build_id: 5, build_finished_at: Time.current) }.not_to raise_error + + # after processing old records + described_class.for_partition(1).update_all(processed: true) + described_class.for_partition(2).update_all(processed: true) + + partition_manager.sync_partitions + + # the old one is removed + expect(partitioning_strategy.current_partitions.map(&:value)).to eq([2]) + + # and we only have the newly created partition left. + expect(described_class.count).to eq(1) + end + end + end +end diff --git a/spec/db/schema_spec.rb b/spec/db/schema_spec.rb index e4e1772f08e61acd55d498c8f44e392106bce085..153f466fa573458abb7aa5f39c69095cfba4a9c1 100644 --- a/spec/db/schema_spec.rb +++ b/spec/db/schema_spec.rb @@ -89,6 +89,7 @@ oauth_applications: %w[owner_id], p_ci_builds: %w[erased_by_id trigger_request_id partition_id], p_batched_git_ref_updates_deletions: %w[project_id partition_id], + p_ci_finished_build_ch_sync_events: %w[build_id], product_analytics_events_experimental: %w[event_id txn_id user_id], project_build_artifacts_size_refreshes: %w[last_job_artifact_id], project_data_transfers: %w[project_id namespace_id],