From 38bdec36df752a47e9d6c40f70e354f5892ad516 Mon Sep 17 00:00:00 2001 From: lma-git Date: Sat, 1 Jun 2024 15:05:04 -0700 Subject: [PATCH 1/4] Add worker to aggregate last 30-day catalog resource usage data Introduces a new service and worker that aggregates the last 30-day component usage counts for all catalog resources. The service is set up with a rolling 30-day window and utilizes the Usages::Aggregator class which aggregates the usage data in time-boxed batches. The worker is scheduled every 4 minutes. On each run, the aggregator resumes processing the usage data from where it last left off. The usage counts are updated in the catalog_resources.last_30_day_usage_count column. Each count is expected to be updated only once per day. This MR also adds database indexes to help optimize the queries executed in the batched aggregation process. Changelog: added --- app/models/ci/catalog/resource.rb | 3 +- .../aggregate_last30_day_usage_service.rb | 64 ++++++ app/workers/all_queues.yml | 9 + .../aggregate_last30_day_usage_worker.rb | 35 ++++ config/initializers/1_settings.rb | 3 + ...e_count_updated_at_on_catalog_resources.rb | 17 ++ ...component_usages_on_catalog_resource_id.rb | 26 +++ db/schema_migrations/20240602161102 | 1 + db/schema_migrations/20240602162649 | 1 + db/structure.sql | 6 +- lib/gitlab/ci/components/usages/aggregator.rb | 14 +- .../components/usages/aggregators/cursor.rb | 2 +- .../ci/components/usages/aggregator_spec.rb | 109 ++++------ .../usages/aggregators/cursor_spec.rb | 14 +- ...aggregate_last30_day_usage_service_spec.rb | 192 ++++++++++++++++++ .../aggregate_last30_day_usage_worker_spec.rb | 71 +++++++ 16 files changed, 485 insertions(+), 82 deletions(-) create mode 100644 app/services/ci/catalog/resources/aggregate_last30_day_usage_service.rb create mode 100644 app/workers/ci/catalog/resources/aggregate_last30_day_usage_worker.rb create mode 100644 db/post_migrate/20240602161102_index_last30_day_usage_count_updated_at_on_catalog_resources.rb create mode 100644 db/post_migrate/20240602162649_change_index_p_catalog_resource_component_usages_on_catalog_resource_id.rb create mode 100644 db/schema_migrations/20240602161102 create mode 100644 db/schema_migrations/20240602162649 create mode 100644 spec/services/ci/catalog/resources/aggregate_last30_day_usage_service_spec.rb create mode 100644 spec/workers/ci/catalog/resources/aggregate_last30_day_usage_worker_spec.rb diff --git a/app/models/ci/catalog/resource.rb b/app/models/ci/catalog/resource.rb index 27fa7c2b5731b4..8d3d10e418d4d5 100644 --- a/app/models/ci/catalog/resource.rb +++ b/app/models/ci/catalog/resource.rb @@ -49,8 +49,7 @@ class Resource < ::ApplicationRecord ) end - # TODO: The usage counts will be populated by a worker that aggregates the data daily. - # See https://gitlab.com/gitlab-org/gitlab/-/issues/452545. + # The usage counts are updated daily by Ci::Catalog::Resources::AggregateLast30DayUsageWorker scope :order_by_last_30_day_usage_count_desc, -> { reorder(last_30_day_usage_count: :desc) } scope :order_by_last_30_day_usage_count_asc, -> { reorder(last_30_day_usage_count: :asc) } diff --git a/app/services/ci/catalog/resources/aggregate_last30_day_usage_service.rb b/app/services/ci/catalog/resources/aggregate_last30_day_usage_service.rb new file mode 100644 index 00000000000000..d6110ebdee780a --- /dev/null +++ b/app/services/ci/catalog/resources/aggregate_last30_day_usage_service.rb @@ -0,0 +1,64 @@ +# frozen_string_literal: true + +module Ci + module Catalog + module Resources + # rubocop: disable CodeReuse/ActiveRecord -- Custom queries required for data processing + class AggregateLast30DayUsageService + TARGET_MODEL = Ci::Catalog::Resource + GROUP_BY_COLUMN = :catalog_resource_id + WINDOW_LENGTH = 30.days + + def execute + today = Date.today + + aggregator = Gitlab::Ci::Components::Usages::Aggregator.new( + target_scope: target_scope, + group_by_column: GROUP_BY_COLUMN, + usage_start_date: today - WINDOW_LENGTH, + usage_end_date: today - 1.day, + lease_key: lease_key + ) + + result = aggregator.each_batch do |usage_counts| + save_usage_counts!(usage_counts) + end + + if result + ServiceResponse.success(message: 'Targets processed', payload: result.to_h) + else + ServiceResponse.success(message: 'Lease taken', payload: { lease_key: lease_key }) + end + end + + private + + # When the aggregator has processed all targets, it restarts from the beginning (target_id=0). + # To minimize this redundant reprocessing, we limit the scope to only unprocessed targets. + def target_scope + TARGET_MODEL.where('last_30_day_usage_count_updated_at < ?', Date.today.to_time) + end + + def save_usage_counts!(usage_counts) + mapping = usage_counts.transform_values { |v| { last_30_day_usage_count: v } } + catalog_resource_ids = usage_counts.keys.map(&:id) + + TARGET_MODEL.transaction do + Gitlab::Database::BulkUpdate.execute(%i[last_30_day_usage_count], mapping) + + # Gitlab::Database::BulkUpdate does not support column type + # `:timestamptz` so we must update the timestamps separately. + TARGET_MODEL + .where('id >= ? AND id <= ?', catalog_resource_ids.min, catalog_resource_ids.max) + .update_all(last_30_day_usage_count_updated_at: Time.current) + end + end + + def lease_key + self.class.name + end + end + # rubocop: enable CodeReuse/ActiveRecord + end + end +end diff --git a/app/workers/all_queues.yml b/app/workers/all_queues.yml index d086e5ad1225e8..8a62d27689f439 100644 --- a/app/workers/all_queues.yml +++ b/app/workers/all_queues.yml @@ -246,6 +246,15 @@ :weight: 1 :idempotent: false :tags: [] +- :name: cronjob:ci_catalog_resources_aggregate_last30_day_usage + :worker_name: Ci::Catalog::Resources::AggregateLast30DayUsageWorker + :feature_category: :pipeline_composition + :has_external_dependencies: false + :urgency: :low + :resource_boundary: :unknown + :weight: 1 + :idempotent: true + :tags: [] - :name: cronjob:ci_catalog_resources_process_sync_events :worker_name: Ci::Catalog::Resources::ProcessSyncEventsWorker :feature_category: :pipeline_composition diff --git a/app/workers/ci/catalog/resources/aggregate_last30_day_usage_worker.rb b/app/workers/ci/catalog/resources/aggregate_last30_day_usage_worker.rb new file mode 100644 index 00000000000000..d3154a822d404f --- /dev/null +++ b/app/workers/ci/catalog/resources/aggregate_last30_day_usage_worker.rb @@ -0,0 +1,35 @@ +# frozen_string_literal: true + +module Ci + module Catalog + module Resources + # This worker can be called multiple times simultaneously but only one can process data at a time. + # This is ensured by an exclusive lease guard in `Gitlab::Ci::Components::Usages::Aggregator`. + # The scheduling frequency should be == `Gitlab::Ci::Components::Usages::Aggregator::MAX_RUNTIME` + # so there is no time gap between job runs. + class AggregateLast30DayUsageWorker + include ApplicationWorker + include CronjobQueue # rubocop: disable Scalability/CronWorkerContext -- Periodic processing is required + + feature_category :pipeline_composition + + data_consistency :sticky + urgency :low + idempotent! + + deduplicate :until_executed, if_deduplicated: :reschedule_once, + ttl: Gitlab::Ci::Components::Usages::Aggregator::LEASE_TIMEOUT + + def perform + response = Ci::Catalog::Resources::AggregateLast30DayUsageService.new.execute + + log_hash_metadata_on_done( + status: response.status, + message: response.message, + **response.payload + ) + end + end + end + end +end diff --git a/config/initializers/1_settings.rb b/config/initializers/1_settings.rb index 1a37895853c4be..819d9ae311a8dd 100644 --- a/config/initializers/1_settings.rb +++ b/config/initializers/1_settings.rb @@ -707,6 +707,9 @@ Settings.cron_jobs['performance_bar_stats'] ||= {} Settings.cron_jobs['performance_bar_stats']['cron'] ||= '*/2 * * * *' Settings.cron_jobs['performance_bar_stats']['job_class'] = 'GitlabPerformanceBarStatsWorker' +Settings.cron_jobs['ci_catalog_resources_aggregate_last30_day_usage_worker'] ||= {} +Settings.cron_jobs['ci_catalog_resources_aggregate_last30_day_usage_worker']['cron'] ||= '*/4 * * * *' +Settings.cron_jobs['ci_catalog_resources_aggregate_last30_day_usage_worker']['job_class'] = 'Ci::Catalog::Resources::AggregateLast30DayUsageWorker' Gitlab.ee do Settings.cron_jobs['analytics_devops_adoption_create_all_snapshots_worker'] ||= {} diff --git a/db/post_migrate/20240602161102_index_last30_day_usage_count_updated_at_on_catalog_resources.rb b/db/post_migrate/20240602161102_index_last30_day_usage_count_updated_at_on_catalog_resources.rb new file mode 100644 index 00000000000000..f5754ab5914666 --- /dev/null +++ b/db/post_migrate/20240602161102_index_last30_day_usage_count_updated_at_on_catalog_resources.rb @@ -0,0 +1,17 @@ +# frozen_string_literal: true + +class IndexLast30DayUsageCountUpdatedAtOnCatalogResources < Gitlab::Database::Migration[2.2] + milestone '17.1' + + disable_ddl_transaction! + + INDEX_NAME = 'idx_catalog_resources_on_id_last_30_day_usage_count_updated_at' + + def up + add_concurrent_index :catalog_resources, [:id, :last_30_day_usage_count_updated_at], name: INDEX_NAME + end + + def down + remove_concurrent_index_by_name :catalog_resources, INDEX_NAME + end +end diff --git a/db/post_migrate/20240602162649_change_index_p_catalog_resource_component_usages_on_catalog_resource_id.rb b/db/post_migrate/20240602162649_change_index_p_catalog_resource_component_usages_on_catalog_resource_id.rb new file mode 100644 index 00000000000000..119026e31ba548 --- /dev/null +++ b/db/post_migrate/20240602162649_change_index_p_catalog_resource_component_usages_on_catalog_resource_id.rb @@ -0,0 +1,26 @@ +# frozen_string_literal: true + +class ChangeIndexPCatalogResourceComponentUsagesOnCatalogResourceId < Gitlab::Database::Migration[2.2] + include Gitlab::Database::PartitioningMigrationHelpers + + milestone '17.1' + + disable_ddl_transaction! + + TABLE_NAME = :p_catalog_resource_component_usages + COLUMN_NAMES = [:catalog_resource_id, :used_by_project_id, :used_date] + INDEX_NAME = 'idx_component_usages_on_catalog_resource_used_by_proj_used_date' + + OLD_COLUMN_NAMES = [:catalog_resource_id] + OLD_INDEX_NAME = 'idx_p_catalog_resource_component_usages_on_catalog_resource_id' + + def up + add_concurrent_partitioned_index(TABLE_NAME, COLUMN_NAMES, name: INDEX_NAME) + remove_concurrent_partitioned_index_by_name(TABLE_NAME, OLD_INDEX_NAME) + end + + def down + add_concurrent_partitioned_index(TABLE_NAME, OLD_COLUMN_NAMES, name: OLD_INDEX_NAME) + remove_concurrent_partitioned_index_by_name(TABLE_NAME, INDEX_NAME) + end +end diff --git a/db/schema_migrations/20240602161102 b/db/schema_migrations/20240602161102 new file mode 100644 index 00000000000000..78e531a0542475 --- /dev/null +++ b/db/schema_migrations/20240602161102 @@ -0,0 +1 @@ +48e27a3376ea15329fed626a4839d5929affd797628e50b7c530741da92e8639 \ No newline at end of file diff --git a/db/schema_migrations/20240602162649 b/db/schema_migrations/20240602162649 new file mode 100644 index 00000000000000..b6298826b568fa --- /dev/null +++ b/db/schema_migrations/20240602162649 @@ -0,0 +1 @@ +87fd0c7f40c011772c12e74f93649bb2fa6c130da0f7a877f423099f94cebb35 \ No newline at end of file diff --git a/db/structure.sql b/db/structure.sql index 2be68f9f41fb79..522d59e7030a0b 100644 --- a/db/structure.sql +++ b/db/structure.sql @@ -24642,6 +24642,8 @@ CREATE INDEX idx_award_emoji_on_user_emoji_name_awardable_type_awardable_id ON a CREATE INDEX idx_build_artifacts_size_refreshes_state_updated_at ON project_build_artifacts_size_refreshes USING btree (state, updated_at); +CREATE INDEX idx_catalog_resources_on_id_last_30_day_usage_count_updated_at ON catalog_resources USING btree (id, last_30_day_usage_count_updated_at); + CREATE UNIQUE INDEX p_ci_job_artifacts_job_id_file_type_partition_id_idx ON ONLY p_ci_job_artifacts USING btree (job_id, file_type, partition_id); CREATE UNIQUE INDEX idx_ci_job_artifacts_on_job_id_file_type_and_partition_id_uniq ON ci_job_artifacts USING btree (job_id, file_type, partition_id); @@ -24652,6 +24654,8 @@ CREATE INDEX idx_ci_running_builds_on_runner_type_and_owner_xid_and_id ON ci_run CREATE INDEX idx_compliance_security_policies_on_policy_configuration_id ON compliance_framework_security_policies USING btree (policy_configuration_id); +CREATE INDEX idx_component_usages_on_catalog_resource_used_by_proj_used_date ON ONLY p_catalog_resource_component_usages USING btree (catalog_resource_id, used_by_project_id, used_date); + CREATE UNIQUE INDEX idx_component_usages_on_component_used_by_project_and_used_date ON ONLY p_catalog_resource_component_usages USING btree (component_id, used_by_project_id, used_date); CREATE INDEX idx_container_exp_policies_on_project_id_next_run_at ON container_expiration_policies USING btree (project_id, next_run_at) WHERE (enabled = true); @@ -24768,8 +24772,6 @@ CREATE INDEX idx_on_protected_branch ON approval_group_rules_protected_branches CREATE INDEX idx_open_issues_on_project_and_confidential_and_author_and_id ON issues USING btree (project_id, confidential, author_id, id) WHERE (state_id = 1); -CREATE INDEX idx_p_catalog_resource_component_usages_on_catalog_resource_id ON ONLY p_catalog_resource_component_usages USING btree (catalog_resource_id); - CREATE INDEX idx_packages_debian_group_component_files_on_architecture_id ON packages_debian_group_component_files USING btree (architecture_id); CREATE INDEX idx_packages_debian_project_component_files_on_architecture_id ON packages_debian_project_component_files USING btree (architecture_id); diff --git a/lib/gitlab/ci/components/usages/aggregator.rb b/lib/gitlab/ci/components/usages/aggregator.rb index 12fa36f9ec95b5..b3e1979ed905b8 100644 --- a/lib/gitlab/ci/components/usages/aggregator.rb +++ b/lib/gitlab/ci/components/usages/aggregator.rb @@ -25,8 +25,8 @@ module Usages # - Yields each batch of `usage_counts` to the given block. # - The block should be able to handle targets that might be reprocessed multiple times. # - `usage_counts` format: { target_object1 => 100, target_object2 => 200, ... } - # - If the lease is obtained, returns a Result containing the `cursor` object and - # `total_targets_completed`. Otherwise, returns nil. + # - If the lease is obtained, returns a Result containing `total_targets_completed` and + # `cursor_attributes`. Otherwise, returns nil. # # Example: # aggregator = Gitlab::Ci::Components::Usages::Aggregator.new( @@ -58,7 +58,7 @@ class Aggregator include Gitlab::Utils::StrongMemoize include ExclusiveLeaseGuard - Result = Struct.new(:cursor, :total_targets_completed, keyword_init: true) + Result = Struct.new(:total_targets_completed, :cursor_attributes, keyword_init: true) TARGET_BATCH_SIZE = 1000 DISTINCT_USAGE_BATCH_SIZE = 100 @@ -82,7 +82,10 @@ def each_batch(&usage_counts_block) try_obtain_lease do total_targets_completed = process_targets(&usage_counts_block) - Result.new(cursor: cursor, total_targets_completed: total_targets_completed) + Result.new( + total_targets_completed: total_targets_completed, + cursor_attributes: cursor.attributes.merge(max_target_id: cursor.max_target_id) + ) end end @@ -91,7 +94,8 @@ def each_batch(&usage_counts_block) attr_reader :target_scope, :group_by_column, :cursor, :runtime_limiter def process_targets - # Restore the scope from cursor so we can resume from the last run + # Restore the scope from cursor so we can resume from the last run. `cursor.target_id` is 0 + # when the Redis cursor is first initialized or when it advances past the max target ID. restored_target_scope = target_scope.where('id >= ?', cursor.target_id) total_targets_completed = 0 diff --git a/lib/gitlab/ci/components/usages/aggregators/cursor.rb b/lib/gitlab/ci/components/usages/aggregators/cursor.rb index b04ed03c76bdfc..b7368aed6677c7 100644 --- a/lib/gitlab/ci/components/usages/aggregators/cursor.rb +++ b/lib/gitlab/ci/components/usages/aggregators/cursor.rb @@ -65,7 +65,7 @@ def advance def attributes { target_id: target_id, - usage_window: usage_window, + usage_window: usage_window.to_h, last_used_by_project_id: last_used_by_project_id, last_usage_count: last_usage_count } diff --git a/spec/lib/gitlab/ci/components/usages/aggregator_spec.rb b/spec/lib/gitlab/ci/components/usages/aggregator_spec.rb index f83f29939e37f5..a71189db550361 100644 --- a/spec/lib/gitlab/ci/components/usages/aggregator_spec.rb +++ b/spec/lib/gitlab/ci/components/usages/aggregator_spec.rb @@ -7,6 +7,9 @@ let_it_be(:usage_start_date) { Date.today - 30.days } let_it_be(:usage_end_date) { Date.today - 1.day } + let_it_be(:resources) { create_list(:ci_catalog_resource, 5).sort_by(&:id) } + let_it_be(:expected_usage_counts) { resources.zip([3, 17, 0, 1, 26]).to_h } + let(:usage_model) { Ci::Catalog::Resources::Components::Usage } let(:target_scope) { Ci::Catalog::Resource } let(:group_by_column) { :catalog_resource_id } @@ -17,50 +20,39 @@ end before_all do - # First catalog resource: 3 components and 3 usages per component on usage_end_date - version = create(:ci_catalog_resource_version) - create_list(:ci_catalog_resource_component, 3, version: version).each do |component| - (1..3).each do |k| - create( - :ci_catalog_resource_component_usage, - component: component, - used_date: usage_end_date, - used_by_project_id: k - ) - end - end - - # Create 4 more catalog resources, each with 1-4 components and 0-6 usages - # per component on different dates before and after usage_end_date - create_list(:ci_catalog_resource_version, 4).each_with_index do |version, i| - create_list(:ci_catalog_resource_component, i + 1, version: version).each_with_index do |component, j| - next unless j > 0 - - (1..j * 2).each do |k| - create( - :ci_catalog_resource_component_usage, - component: component, - used_date: usage_end_date - 3.days + k.days, - used_by_project_id: k - ) + # Set up each resource with 1-5 versions, 1-5 components per version, and the expected usages per component + expected_usage_counts.each_with_index do |(resource, usage_count), i| + create_list(:ci_catalog_resource_version, i + 1, catalog_resource: resource).each do |version| + (1..i + 1).each do |j| + component = create(:ci_catalog_resource_component, version: version, name: "component#{j}") + + (1..usage_count).each do |k| + # Inside the usage window + create(:ci_catalog_resource_component_usage, + component: component, used_date: usage_start_date, used_by_project_id: k) + # Outside the usage window + create(:ci_catalog_resource_component_usage, + component: component, used_date: usage_start_date - k.days, used_by_project_id: k) + end end end end end describe '#each_batch' do - shared_examples 'when the runtime limit is not reached' do + shared_examples 'when the aggregator is not interrupted' do it 'returns the expected result' do # We process all catalog resources and advance the cursor batched_usage_counts, result = run_new_aggregator_each_batch expect(batched_usage_counts).to eq(expected_batched_usage_counts) expect(result.total_targets_completed).to eq(target_scope.count) - expect(result.cursor.attributes).to eq({ + expect(result.cursor_attributes).to eq({ target_id: 0, - usage_window: usage_window, + usage_window: usage_window.to_h, last_used_by_project_id: 0, - last_usage_count: 0 + last_usage_count: 0, + max_target_id: target_scope.maximum(:id).to_i }) end end @@ -70,9 +62,9 @@ stub_const("#{described_class}::DISTINCT_USAGE_BATCH_SIZE", 2) end - it_behaves_like 'when the runtime limit is not reached' + it_behaves_like 'when the aggregator is not interrupted' - context 'when the runtime limit is reached' do + context 'when the aggregator is interrupted' do before do # Sets the aggregator to break after the first iteration on each run stub_const("#{described_class}::MAX_RUNTIME", 0) @@ -84,11 +76,12 @@ expect(batched_usage_counts).to eq([]) expect(result.total_targets_completed).to eq(0) - expect(result.cursor.attributes).to eq({ + expect(result.cursor_attributes).to eq({ target_id: target_scope.first.id, - usage_window: usage_window, + usage_window: usage_window.to_h, last_used_by_project_id: 2, - last_usage_count: 2 + last_usage_count: 2, + max_target_id: target_scope.maximum(:id).to_i }) # On 2nd run, we get the complete usage count for the first catalog resource and advance the cursor @@ -96,18 +89,19 @@ expect(batched_usage_counts).to eq([{ target_scope.first => 3 }]) expect(result.total_targets_completed).to eq(1) - expect(result.cursor.attributes).to eq({ + expect(result.cursor_attributes).to eq({ target_id: target_scope.first.id + 1, - usage_window: usage_window, + usage_window: usage_window.to_h, last_used_by_project_id: 0, - last_usage_count: 0 + last_usage_count: 0, + max_target_id: target_scope.maximum(:id).to_i }) all_batched_usage_counts = batched_usage_counts + repeat_new_aggregator_each_batch_until_done batched_usage_counts_merged = all_batched_usage_counts.flatten.reduce(&:merge) expect(batched_usage_counts_merged.length).to eq(5) - expect(batched_usage_counts_merged).to eq(expected_batched_usage_counts_merged) + expect(batched_usage_counts_merged).to eq(expected_usage_counts) end context 'when a target is deleted between runs' do @@ -117,11 +111,12 @@ expect(batched_usage_counts).to eq([]) expect(result.total_targets_completed).to eq(0) - expect(result.cursor.attributes).to eq({ + expect(result.cursor_attributes).to eq({ target_id: target_scope.first.id, - usage_window: usage_window, + usage_window: usage_window.to_h, last_used_by_project_id: 2, - last_usage_count: 2 + last_usage_count: 2, + max_target_id: target_scope.maximum(:id).to_i }) target_scope.first.delete @@ -130,7 +125,7 @@ batched_usage_counts_merged = all_batched_usage_counts.reduce(&:merge) expect(batched_usage_counts_merged.length).to eq(4) - expect(batched_usage_counts_merged).to eq(expected_batched_usage_counts_merged) + expect(batched_usage_counts_merged).to eq(expected_usage_counts.except(resources.first)) end end @@ -142,13 +137,13 @@ batched_usage_counts_merged = all_batched_usage_counts.reduce(&:merge) expect(batched_usage_counts_merged.length).to eq(5) - expect(batched_usage_counts_merged).to eq(expected_batched_usage_counts_merged) + expect(batched_usage_counts_merged).to eq(expected_usage_counts.transform_values { 0 }) end end end end - it_behaves_like 'when the runtime limit is not reached' + it_behaves_like 'when the aggregator is not interrupted' it_behaves_like 'with multiple distinct usage batches' context 'with multiple target batches' do @@ -156,7 +151,7 @@ stub_const("#{described_class}::TARGET_BATCH_SIZE", 3) end - it_behaves_like 'when the runtime limit is not reached' + it_behaves_like 'when the aggregator is not interrupted' it_behaves_like 'with multiple distinct usage batches' end @@ -190,25 +185,9 @@ def run_new_aggregator_each_batch end def expected_batched_usage_counts - batched_usage_counts = [] - - target_scope.each_batch(of: described_class::TARGET_BATCH_SIZE) do |targets| - usage_counts = usage_model - .includes(:catalog_resource) - .select('catalog_resource_id, COUNT(DISTINCT used_by_project_id) AS usage_count') - .where(used_date: usage_start_date..usage_end_date) - .where(group_by_column => targets) - .group(:catalog_resource_id) - .each_with_object({}) { |r, hash| hash[r.catalog_resource] = r.usage_count } - - batched_usage_counts << targets.index_with { 0 }.merge(usage_counts) + resources.each_slice(described_class::TARGET_BATCH_SIZE).map do |batch| + expected_usage_counts.slice(*batch) end - - batched_usage_counts - end - - def expected_batched_usage_counts_merged - expected_batched_usage_counts.reduce(&:merge) end def repeat_new_aggregator_each_batch_until_done @@ -217,7 +196,7 @@ def repeat_new_aggregator_each_batch_until_done 30.times do batched_usage_counts, result = run_new_aggregator_each_batch all_batched_usage_counts << batched_usage_counts - break if result.cursor.target_id == 0 + break if result.cursor_attributes[:target_id] == 0 end all_batched_usage_counts.flatten diff --git a/spec/lib/gitlab/ci/components/usages/aggregators/cursor_spec.rb b/spec/lib/gitlab/ci/components/usages/aggregators/cursor_spec.rb index d4d416e4e586bb..9e633096550500 100644 --- a/spec/lib/gitlab/ci/components/usages/aggregators/cursor_spec.rb +++ b/spec/lib/gitlab/ci/components/usages/aggregators/cursor_spec.rb @@ -14,7 +14,7 @@ let(:initial_redis_attributes) do { target_id: 1, - usage_window: initial_redis_usage_window, + usage_window: initial_redis_usage_window.to_h, last_used_by_project_id: 100, last_usage_count: 10 } @@ -41,7 +41,7 @@ it 'resets last usage attributes' do expect(cursor.attributes).to eq({ target_id: initial_redis_attributes[:target_id], - usage_window: usage_window, + usage_window: usage_window.to_h, last_used_by_project_id: 0, last_usage_count: 0 }) @@ -58,7 +58,7 @@ it 'sets target_id and last usage attributes to zero' do expect(cursor.attributes).to eq({ target_id: 0, - usage_window: usage_window, + usage_window: usage_window.to_h, last_used_by_project_id: 0, last_usage_count: 0 }) @@ -78,7 +78,7 @@ expect(cursor.interrupted?).to eq(true) expect(cursor.attributes).to eq({ target_id: initial_redis_attributes[:target_id], - usage_window: usage_window, + usage_window: usage_window.to_h, last_used_by_project_id: initial_redis_attributes[:last_used_by_project_id] + 1, last_usage_count: initial_redis_attributes[:last_usage_count] + 1 }) @@ -92,7 +92,7 @@ expect(cursor.attributes).to eq({ target_id: initial_redis_attributes[:target_id] + 1, - usage_window: usage_window, + usage_window: usage_window.to_h, last_used_by_project_id: 0, last_usage_count: 0 }) @@ -115,7 +115,7 @@ expect(cursor.attributes).to eq({ target_id: initial_redis_attributes[:target_id] + 1, - usage_window: usage_window, + usage_window: usage_window.to_h, last_used_by_project_id: 0, last_usage_count: 0 }) @@ -128,7 +128,7 @@ expect(cursor.attributes).to eq({ target_id: 0, - usage_window: usage_window, + usage_window: usage_window.to_h, last_used_by_project_id: 0, last_usage_count: 0 }) diff --git a/spec/services/ci/catalog/resources/aggregate_last30_day_usage_service_spec.rb b/spec/services/ci/catalog/resources/aggregate_last30_day_usage_service_spec.rb new file mode 100644 index 00000000000000..3477c31655c48a --- /dev/null +++ b/spec/services/ci/catalog/resources/aggregate_last30_day_usage_service_spec.rb @@ -0,0 +1,192 @@ +# frozen_string_literal: true + +require 'spec_helper' + +RSpec.describe Ci::Catalog::Resources::AggregateLast30DayUsageService, :clean_gitlab_redis_shared_state, :freeze_time, + feature_category: :pipeline_composition do + let_it_be(:usage_start_date) { Date.today - described_class::WINDOW_LENGTH } + let_it_be(:usage_end_date) { Date.today - 1.day } + let_it_be(:initial_usage_count_updated_at) { Date.today.to_time - 1.hour } + + let_it_be(:resources) { create_list(:ci_catalog_resource, 4).sort_by(&:id) } + let_it_be(:expected_ordered_usage_counts) { [3, 1, 0, 15] } + + let(:expected_cursor_attributes) do + { + target_id: 0, + usage_window: usage_window_hash, + last_used_by_project_id: 0, + last_usage_count: 0, + max_target_id: 0 + } + end + + let(:usage_window_hash) { { start_date: usage_start_date, end_date: usage_end_date } } + let(:lease_key) { described_class.name } + let(:service) { described_class.new } + + before_all do + # Set up each resource with 1-4 versions, 1-4 components per version, and the expected usages per component + expected_ordered_usage_counts.each_with_index do |usage_count, i| + resource = resources[i] + + create_list(:ci_catalog_resource_version, i + 1, catalog_resource: resource).each do |version| + (1..i + 1).each do |j| + component = create(:ci_catalog_resource_component, version: version, name: "component#{j}") + + (1..usage_count).each do |k| + # Inside the usage window + create(:ci_catalog_resource_component_usage, + component: component, used_date: usage_start_date, used_by_project_id: k) + # Outside the usage window + create(:ci_catalog_resource_component_usage, + component: component, used_date: usage_start_date - k.days, used_by_project_id: k) + end + end + end + end + + Ci::Catalog::Resource.update_all(last_30_day_usage_count_updated_at: initial_usage_count_updated_at) + end + + describe '#execute' do + context 'when the aggregator is not interrupted' do + it 'aggregates usage data for all catalog resources' do + response = service.execute + + expect(response).to be_success + expect(response.payload).to eq({ + total_targets_completed: 4, + cursor_attributes: expected_cursor_attributes + }) + end + + it 'calls BulkUpdate once and updates usage counts for all catalog resources' do + expect(Gitlab::Database::BulkUpdate).to receive(:execute).once.and_call_original + + service.execute + + expect(ordered_usage_counts).to eq(expected_ordered_usage_counts) + expect(ordered_usage_counts_updated_at).to match_array([Time.current] * 4) + end + + context 'when there are two batches of usage counts' do + before do + stub_const('Gitlab::Ci::Components::Usages::Aggregator::TARGET_BATCH_SIZE', 2) + end + + it 'calls BulkUpdate twice and updates usage counts for all catalog resources' do + expect(Gitlab::Database::BulkUpdate).to receive(:execute).twice.and_call_original + + service.execute + + expect(ordered_usage_counts).to eq(expected_ordered_usage_counts) + expect(ordered_usage_counts_updated_at).to match_array([Time.current] * 4) + end + end + + context 'when some catalog resources have already been processed today' do + before_all do + resources.first(2).each do |resource| + resource.update!(last_30_day_usage_count_updated_at: Date.today.to_time) + end + end + + it 'aggregates usage data for only the unprocessed catalog resources' do + response = service.execute + + expect(response).to be_success + expect(response.payload).to eq({ + total_targets_completed: 2, + cursor_attributes: expected_cursor_attributes + }) + end + + it 'calls BulkUpdate once and updates usage counts for only the unprocessed catalog resources' do + expect(Gitlab::Database::BulkUpdate).to receive(:execute).once.and_call_original + + service.execute + + expect(ordered_usage_counts).to eq([0, 0, *expected_ordered_usage_counts.last(2)]) + expect(ordered_usage_counts_updated_at).to eq([[Date.today.to_time] * 2, [Time.current] * 2].flatten) + end + end + + context 'when all catalog resources have already been processed today' do + before_all do + Ci::Catalog::Resource.update_all(last_30_day_usage_count_updated_at: Date.today.to_time) + end + + it 'does not aggregate usage data' do + response = service.execute + + expect(response).to be_success + expect(response.payload).to eq({ + total_targets_completed: 0, + cursor_attributes: expected_cursor_attributes + }) + end + + it 'does not call BulkUpdate' do + expect(Gitlab::Database::BulkUpdate).not_to receive(:execute) + + expect { service.execute } + .to not_change { ordered_usage_counts } + .and not_change { ordered_usage_counts_updated_at } + end + end + end + + context 'when the aggregator is interrupted' do + before do + # Sets the aggregator to break after the first iteration on each run + stub_const('Gitlab::Ci::Components::Usages::Aggregator::MAX_RUNTIME', 0) + stub_const('Gitlab::Ci::Components::Usages::Aggregator::DISTINCT_USAGE_BATCH_SIZE', 2) + end + + it 'updates the expected usage counts for each run' do + # On 1st run, we get an incomplete usage count for the first catalog resource so it is not saved + expect { service.execute } + .to not_change { ordered_usage_counts } + .and not_change { ordered_usage_counts_updated_at } + + # On 2nd run, we get the complete usage count for the first catalog resource and save it + service.execute + + expect(ordered_usage_counts).to eq([expected_ordered_usage_counts.first, 0, 0, 0]) + expect(ordered_usage_counts_updated_at).to eq([Time.current, [initial_usage_count_updated_at] * 3].flatten) + + # Execute service repeatedly until done + 30.times do + response = service.execute + break if response.payload[:cursor_attributes][:target_id] == 0 + end + + expect(ordered_usage_counts).to eq(expected_ordered_usage_counts) + expect(ordered_usage_counts_updated_at).to match_array([Time.current] * 4) + end + end + + context 'when another instance is running with the same lease key' do + it 'returns a success response with the lease key' do + lease = Gitlab::ExclusiveLease.new(lease_key, timeout: 1.minute).tap(&:try_obtain) + response = service.execute + + expect(response).to be_success + expect(response.message).to eq('Lease taken') + expect(response.payload).to eq({ lease_key: lease_key }) + lease.cancel + end + end + end + + private + + def ordered_usage_counts + Ci::Catalog::Resource.order(:id).pluck(:last_30_day_usage_count) + end + + def ordered_usage_counts_updated_at + Ci::Catalog::Resource.order(:id).pluck(:last_30_day_usage_count_updated_at) + end +end diff --git a/spec/workers/ci/catalog/resources/aggregate_last30_day_usage_worker_spec.rb b/spec/workers/ci/catalog/resources/aggregate_last30_day_usage_worker_spec.rb new file mode 100644 index 00000000000000..a129ef10d48f86 --- /dev/null +++ b/spec/workers/ci/catalog/resources/aggregate_last30_day_usage_worker_spec.rb @@ -0,0 +1,71 @@ +# frozen_string_literal: true + +require 'spec_helper' + +RSpec.describe Ci::Catalog::Resources::AggregateLast30DayUsageWorker, feature_category: :pipeline_composition do + subject(:worker) { described_class.new } + + include_examples 'an idempotent worker' + + it 'has the `until_executed` deduplicate strategy' do + expect(described_class.get_deduplicate_strategy).to eq(:until_executed) + end + + it 'has the option to reschedule once if deduplicated and a TTL' do + expect(described_class.get_deduplication_options) + .to include({ if_deduplicated: :reschedule_once, ttl: Gitlab::Ci::Components::Usages::Aggregator::LEASE_TIMEOUT }) + end + + describe '#perform', :clean_gitlab_redis_shared_state, :freeze_time do + let_it_be(:usage_start_date) { Date.today - Ci::Catalog::Resources::AggregateLast30DayUsageService::WINDOW_LENGTH } + let_it_be(:usage_end_date) { Date.today - 1.day } + + let_it_be(:resources) { create_list(:ci_catalog_resource, 3).sort_by(&:id) } + let_it_be(:expected_ordered_usage_counts) { [7, 12, 0] } + + let(:usage_window_hash) { { start_date: usage_start_date, end_date: usage_end_date } } + + subject(:perform) { worker.perform } + + before_all do + # Set up each resource with 1 version and 1 component, and the expected usages per component + expected_ordered_usage_counts.each_with_index do |usage_count, i| + resource = resources[i] + version = create(:ci_catalog_resource_version, catalog_resource: resource) + component = create(:ci_catalog_resource_component, version: version) + + (1..usage_count).each do |k| + create(:ci_catalog_resource_component_usage, + component: component, used_date: usage_start_date, used_by_project_id: k) + end + end + end + + it 'aggregates and updates usage counts for all catalog resources' do + perform + + ordered_usage_counts = Ci::Catalog::Resource.order(:id).pluck(:last_30_day_usage_count) + ordered_usage_counts_updated_at = Ci::Catalog::Resource.order(:id).pluck(:last_30_day_usage_count_updated_at) + + expect(ordered_usage_counts).to eq(expected_ordered_usage_counts) + expect(ordered_usage_counts_updated_at).to match_array([Time.current] * 3) + end + + it 'logs the service response' do + expect(worker).to receive(:log_hash_metadata_on_done) + .with( + status: :success, + message: 'Targets processed', + total_targets_completed: 3, + cursor_attributes: { + target_id: 0, + usage_window: usage_window_hash, + last_used_by_project_id: 0, + last_usage_count: 0, + max_target_id: 0 + }) + + perform + end + end +end -- GitLab From 698b98f976eb6877ee88ebdb8f3df3c94ce5294b Mon Sep 17 00:00:00 2001 From: lma-git Date: Wed, 5 Jun 2024 16:10:36 -0700 Subject: [PATCH 2/4] Add more comments and move max_target_id Added more clarifying comments and moved max_target_id so that it's included in the cursor attributes. --- .../aggregate_last30_day_usage_service.rb | 7 ++-- .../aggregate_last30_day_usage_worker.rb | 2 +- lib/gitlab/ci/components/usages/aggregator.rb | 27 ++++++++------- .../components/usages/aggregators/cursor.rb | 15 +++++---- .../usages/aggregators/cursor_spec.rb | 33 +++++++------------ ...aggregate_last30_day_usage_service_spec.rb | 2 +- .../aggregate_last30_day_usage_worker_spec.rb | 4 +-- 7 files changed, 41 insertions(+), 49 deletions(-) diff --git a/app/services/ci/catalog/resources/aggregate_last30_day_usage_service.rb b/app/services/ci/catalog/resources/aggregate_last30_day_usage_service.rb index d6110ebdee780a..a524aa62111bfc 100644 --- a/app/services/ci/catalog/resources/aggregate_last30_day_usage_service.rb +++ b/app/services/ci/catalog/resources/aggregate_last30_day_usage_service.rb @@ -3,7 +3,10 @@ module Ci module Catalog module Resources - # rubocop: disable CodeReuse/ActiveRecord -- Custom queries required for data processing + # This service aggregates CI component usage data and updates `last_30_day_usage_count` for + # each catalog resource daily. It utilizes Gitlab::Ci::Components::Usages::Aggregator which + # implements a "continue later" mechanism to process the data in time-boxed jobs. + # rubocop: disable CodeReuse/ActiveRecord -- Custom queries required class AggregateLast30DayUsageService TARGET_MODEL = Ci::Catalog::Resource GROUP_BY_COLUMN = :catalog_resource_id @@ -49,7 +52,7 @@ def save_usage_counts!(usage_counts) # Gitlab::Database::BulkUpdate does not support column type # `:timestamptz` so we must update the timestamps separately. TARGET_MODEL - .where('id >= ? AND id <= ?', catalog_resource_ids.min, catalog_resource_ids.max) + .where(id: catalog_resource_ids) .update_all(last_30_day_usage_count_updated_at: Time.current) end end diff --git a/app/workers/ci/catalog/resources/aggregate_last30_day_usage_worker.rb b/app/workers/ci/catalog/resources/aggregate_last30_day_usage_worker.rb index d3154a822d404f..693f9363c1cb9e 100644 --- a/app/workers/ci/catalog/resources/aggregate_last30_day_usage_worker.rb +++ b/app/workers/ci/catalog/resources/aggregate_last30_day_usage_worker.rb @@ -18,7 +18,7 @@ class AggregateLast30DayUsageWorker idempotent! deduplicate :until_executed, if_deduplicated: :reschedule_once, - ttl: Gitlab::Ci::Components::Usages::Aggregator::LEASE_TIMEOUT + ttl: Gitlab::Ci::Components::Usages::Aggregator::WORKER_DEDUP_TTL def perform response = Ci::Catalog::Resources::AggregateLast30DayUsageService.new.execute diff --git a/lib/gitlab/ci/components/usages/aggregator.rb b/lib/gitlab/ci/components/usages/aggregator.rb index b3e1979ed905b8..18839f713dd192 100644 --- a/lib/gitlab/ci/components/usages/aggregator.rb +++ b/lib/gitlab/ci/components/usages/aggregator.rb @@ -7,23 +7,24 @@ module Usages # Component usage is defined as the number of unique `used_by_project_id`s in the table # `p_catalog_resource_component_usages` for a given scope. # - # This aggregator iterates through the target scope in batches. For each target ID, it collects - # the usage count using `distinct_each_batch` for the given usage window. Since this process can - # be interrupted when it reaches MAX_RUNTIME, we utilize a Redis cursor so the aggregator can - # resume from where it left off on each run. We collect the count in Rails because the SQL query - # `COUNT(DISTINCT(*))` is not performant when the dataset is large. + # This aggregator is intended to be run in a scheduled cron job. It implements a "continue later" + # mechanism with a Redis cursor, which enables the work to continue from where it was last interrupted + # on each run. It iterates through the target scope in batches. For each target ID, it collects the + # usage count using `distinct_each_batch` for the given usage window. We collect the count in Rails + # because the SQL query `COUNT(DISTINCT(*))` is not performant when the dataset is large. # - # RUNTIME: The actual total runtime will be slightly longer than MAX_RUNTIME because + # RUNTIME: The actual total runtime will be longer than MAX_RUNTIME because # it depends on the execution time of `&usage_counts_block`. # EXCLUSIVE LEASE: This aggregator is protected from parallel processing with an exclusive lease guard. # WORKER: The worker running this service should be scheduled at the same cadence as MAX_RUNTIME, with: - # deduplicate :until_executed, if_deduplicated: :reschedule_once, ttl: LEASE_TIMEOUT + # deduplicate :until_executed, if_deduplicated: :reschedule_once, ttl: WORKER_DEDUP_TTL # ##### Usage # # each_batch: - # - Yields each batch of `usage_counts` to the given block. - # - The block should be able to handle targets that might be reprocessed multiple times. + # - Yields each batch of `usage_counts` to the given block. The block should: + # - Be able to handle targets that might be reprocessed multiple times. + # - Not exceed 1 minute in execution time. # - `usage_counts` format: { target_object1 => 100, target_object2 => 200, ... } # - If the lease is obtained, returns a Result containing `total_targets_completed` and # `cursor_attributes`. Otherwise, returns nil. @@ -63,7 +64,8 @@ class Aggregator TARGET_BATCH_SIZE = 1000 DISTINCT_USAGE_BATCH_SIZE = 100 MAX_RUNTIME = 4.minutes # Should be >= job scheduling frequency so there is no gap between job runs - LEASE_TIMEOUT = 5.minutes # Should be MAX_RUNTIME + extra time to execute `&usage_counts_block` + WORKER_DEDUP_TTL = MAX_RUNTIME + 1.minute # Includes extra time to execute `&usage_counts_block` + LEASE_TIMEOUT = 10.minutes def initialize(target_scope:, group_by_column:, usage_start_date:, usage_end_date:, lease_key:) @target_scope = target_scope @@ -82,10 +84,7 @@ def each_batch(&usage_counts_block) try_obtain_lease do total_targets_completed = process_targets(&usage_counts_block) - Result.new( - total_targets_completed: total_targets_completed, - cursor_attributes: cursor.attributes.merge(max_target_id: cursor.max_target_id) - ) + Result.new(total_targets_completed: total_targets_completed, cursor_attributes: cursor.attributes) end end diff --git a/lib/gitlab/ci/components/usages/aggregators/cursor.rb b/lib/gitlab/ci/components/usages/aggregators/cursor.rb index b7368aed6677c7..38711aaec264fc 100644 --- a/lib/gitlab/ci/components/usages/aggregators/cursor.rb +++ b/lib/gitlab/ci/components/usages/aggregators/cursor.rb @@ -67,18 +67,14 @@ def attributes target_id: target_id, usage_window: usage_window.to_h, last_used_by_project_id: last_used_by_project_id, - last_usage_count: last_usage_count + last_usage_count: last_usage_count, + max_target_id: max_target_id } end - def max_target_id - target_scope.maximum(:id).to_i - end - strong_memoize_attr :max_target_id - def save! Gitlab::Redis::SharedState.with do |redis| - redis.set(redis_key, attributes.to_json, ex: CURSOR_REDIS_KEY_TTL) + redis.set(redis_key, attributes.except(:max_target_id).to_json, ex: CURSOR_REDIS_KEY_TTL) end end @@ -107,6 +103,11 @@ def reset_last_usage_attributes @last_usage_count = 0 end + def max_target_id + target_scope.maximum(:id).to_i + end + strong_memoize_attr :max_target_id + def parse_date(date_str) Date.parse(date_str) if date_str end diff --git a/spec/lib/gitlab/ci/components/usages/aggregators/cursor_spec.rb b/spec/lib/gitlab/ci/components/usages/aggregators/cursor_spec.rb index 9e633096550500..30e632d8828809 100644 --- a/spec/lib/gitlab/ci/components/usages/aggregators/cursor_spec.rb +++ b/spec/lib/gitlab/ci/components/usages/aggregators/cursor_spec.rb @@ -30,7 +30,7 @@ describe '.new' do it 'fetches and parses the attributes from Redis' do - expect(cursor.attributes).to eq(initial_redis_attributes) + expect(cursor.attributes).to include(initial_redis_attributes) end context 'when Redis usage_window is different than the given usage_window' do @@ -39,7 +39,7 @@ end it 'resets last usage attributes' do - expect(cursor.attributes).to eq({ + expect(cursor.attributes).to include({ target_id: initial_redis_attributes[:target_id], usage_window: usage_window.to_h, last_used_by_project_id: 0, @@ -56,7 +56,7 @@ end it 'sets target_id and last usage attributes to zero' do - expect(cursor.attributes).to eq({ + expect(cursor.attributes).to include({ target_id: 0, usage_window: usage_window.to_h, last_used_by_project_id: 0, @@ -76,7 +76,7 @@ ) expect(cursor.interrupted?).to eq(true) - expect(cursor.attributes).to eq({ + expect(cursor.attributes).to include({ target_id: initial_redis_attributes[:target_id], usage_window: usage_window.to_h, last_used_by_project_id: initial_redis_attributes[:last_used_by_project_id] + 1, @@ -90,7 +90,7 @@ it 'sets new target_id and resets last usage attributes' do cursor.target_id = initial_redis_attributes[:target_id] + 1 - expect(cursor.attributes).to eq({ + expect(cursor.attributes).to include({ target_id: initial_redis_attributes[:target_id] + 1, usage_window: usage_window.to_h, last_used_by_project_id: 0, @@ -101,7 +101,7 @@ context 'when new target_id is the same as cursor target_id' do it 'does not change cursor attributes' do - expect(cursor.attributes).to eq(initial_redis_attributes) + expect(cursor.attributes).to include(initial_redis_attributes) end end end @@ -117,7 +117,8 @@ target_id: initial_redis_attributes[:target_id] + 1, usage_window: usage_window.to_h, last_used_by_project_id: 0, - last_usage_count: 0 + last_usage_count: 0, + max_target_id: max_target_id }) end end @@ -130,27 +131,15 @@ target_id: 0, usage_window: usage_window.to_h, last_used_by_project_id: 0, - last_usage_count: 0 + last_usage_count: 0, + max_target_id: max_target_id }) end end end - describe '#max_target_id' do - let(:target_scope) { Ci::Catalog::Resource } - - before_all do - create(:ci_catalog_resource, id: 123) - create(:ci_catalog_resource, id: 100) - end - - it 'returns maximum ID of the target scope' do - expect(cursor.max_target_id).to eq(123) - end - end - describe '#save!' do - it 'saves cursor attributes to Redis as JSON' do + it 'saves cursor attributes except max_target_id to Redis as JSON' do cursor.target_id = 11 cursor.interrupt!( last_used_by_project_id: 33, diff --git a/spec/services/ci/catalog/resources/aggregate_last30_day_usage_service_spec.rb b/spec/services/ci/catalog/resources/aggregate_last30_day_usage_service_spec.rb index 3477c31655c48a..59b22d544bc2bd 100644 --- a/spec/services/ci/catalog/resources/aggregate_last30_day_usage_service_spec.rb +++ b/spec/services/ci/catalog/resources/aggregate_last30_day_usage_service_spec.rb @@ -6,7 +6,7 @@ feature_category: :pipeline_composition do let_it_be(:usage_start_date) { Date.today - described_class::WINDOW_LENGTH } let_it_be(:usage_end_date) { Date.today - 1.day } - let_it_be(:initial_usage_count_updated_at) { Date.today.to_time - 1.hour } + let_it_be(:initial_usage_count_updated_at) { Date.yesterday.to_time } let_it_be(:resources) { create_list(:ci_catalog_resource, 4).sort_by(&:id) } let_it_be(:expected_ordered_usage_counts) { [3, 1, 0, 15] } diff --git a/spec/workers/ci/catalog/resources/aggregate_last30_day_usage_worker_spec.rb b/spec/workers/ci/catalog/resources/aggregate_last30_day_usage_worker_spec.rb index a129ef10d48f86..83367a23749b77 100644 --- a/spec/workers/ci/catalog/resources/aggregate_last30_day_usage_worker_spec.rb +++ b/spec/workers/ci/catalog/resources/aggregate_last30_day_usage_worker_spec.rb @@ -12,8 +12,8 @@ end it 'has the option to reschedule once if deduplicated and a TTL' do - expect(described_class.get_deduplication_options) - .to include({ if_deduplicated: :reschedule_once, ttl: Gitlab::Ci::Components::Usages::Aggregator::LEASE_TIMEOUT }) + expect(described_class.get_deduplication_options).to include( + { if_deduplicated: :reschedule_once, ttl: Gitlab::Ci::Components::Usages::Aggregator::WORKER_DEDUP_TTL }) end describe '#perform', :clean_gitlab_redis_shared_state, :freeze_time do -- GitLab From 82fbe307e0f35673d1b02c5e7ac77d3b0cdec544 Mon Sep 17 00:00:00 2001 From: lma-git Date: Thu, 6 Jun 2024 16:20:04 -0700 Subject: [PATCH 3/4] Replace target_scope with stop condition instead Replaces target_scope with target_model and uses a stop condition to limit reprocessing instead of a SQL condition. --- .../aggregate_last30_day_usage_service.rb | 20 +++++--- ...e_count_updated_at_on_catalog_resources.rb | 4 +- db/structure.sql | 4 +- lib/gitlab/ci/components/usages/aggregator.rb | 35 +++++++------- .../components/usages/aggregators/cursor.rb | 8 ++-- lib/gitlab/database/bulk_update.rb | 5 ++ .../ci/components/usages/aggregator_spec.rb | 24 +++++----- .../usages/aggregators/cursor_spec.rb | 4 +- ...aggregate_last30_day_usage_service_spec.rb | 47 +++---------------- .../aggregate_last30_day_usage_worker_spec.rb | 2 +- 10 files changed, 66 insertions(+), 87 deletions(-) diff --git a/app/services/ci/catalog/resources/aggregate_last30_day_usage_service.rb b/app/services/ci/catalog/resources/aggregate_last30_day_usage_service.rb index a524aa62111bfc..b6cf7ed6990c76 100644 --- a/app/services/ci/catalog/resources/aggregate_last30_day_usage_service.rb +++ b/app/services/ci/catalog/resources/aggregate_last30_day_usage_service.rb @@ -8,15 +8,17 @@ module Resources # implements a "continue later" mechanism to process the data in time-boxed jobs. # rubocop: disable CodeReuse/ActiveRecord -- Custom queries required class AggregateLast30DayUsageService + include Gitlab::Utils::StrongMemoize + TARGET_MODEL = Ci::Catalog::Resource GROUP_BY_COLUMN = :catalog_resource_id WINDOW_LENGTH = 30.days def execute - today = Date.today + return ServiceResponse.success(message: "Processing complete for #{today}") if done_processing? aggregator = Gitlab::Ci::Components::Usages::Aggregator.new( - target_scope: target_scope, + target_model: TARGET_MODEL, group_by_column: GROUP_BY_COLUMN, usage_start_date: today - WINDOW_LENGTH, usage_end_date: today - 1.day, @@ -36,10 +38,11 @@ def execute private - # When the aggregator has processed all targets, it restarts from the beginning (target_id=0). - # To minimize this redundant reprocessing, we limit the scope to only unprocessed targets. - def target_scope - TARGET_MODEL.where('last_30_day_usage_count_updated_at < ?', Date.today.to_time) + def done_processing? + min_updated_at = TARGET_MODEL.minimum(:last_30_day_usage_count_updated_at) + return true unless min_updated_at + + min_updated_at >= today.to_time end def save_usage_counts!(usage_counts) @@ -57,6 +60,11 @@ def save_usage_counts!(usage_counts) end end + def today + Date.today + end + strong_memoize_attr :today + def lease_key self.class.name end diff --git a/db/post_migrate/20240602161102_index_last30_day_usage_count_updated_at_on_catalog_resources.rb b/db/post_migrate/20240602161102_index_last30_day_usage_count_updated_at_on_catalog_resources.rb index f5754ab5914666..d4444dc5bb4f4f 100644 --- a/db/post_migrate/20240602161102_index_last30_day_usage_count_updated_at_on_catalog_resources.rb +++ b/db/post_migrate/20240602161102_index_last30_day_usage_count_updated_at_on_catalog_resources.rb @@ -5,10 +5,10 @@ class IndexLast30DayUsageCountUpdatedAtOnCatalogResources < Gitlab::Database::Mi disable_ddl_transaction! - INDEX_NAME = 'idx_catalog_resources_on_id_last_30_day_usage_count_updated_at' + INDEX_NAME = 'index_catalog_resources_on_last_30_day_usage_count_updated_at' def up - add_concurrent_index :catalog_resources, [:id, :last_30_day_usage_count_updated_at], name: INDEX_NAME + add_concurrent_index :catalog_resources, :last_30_day_usage_count_updated_at, name: INDEX_NAME end def down diff --git a/db/structure.sql b/db/structure.sql index 522d59e7030a0b..7d57c87429a8b1 100644 --- a/db/structure.sql +++ b/db/structure.sql @@ -24642,8 +24642,6 @@ CREATE INDEX idx_award_emoji_on_user_emoji_name_awardable_type_awardable_id ON a CREATE INDEX idx_build_artifacts_size_refreshes_state_updated_at ON project_build_artifacts_size_refreshes USING btree (state, updated_at); -CREATE INDEX idx_catalog_resources_on_id_last_30_day_usage_count_updated_at ON catalog_resources USING btree (id, last_30_day_usage_count_updated_at); - CREATE UNIQUE INDEX p_ci_job_artifacts_job_id_file_type_partition_id_idx ON ONLY p_ci_job_artifacts USING btree (job_id, file_type, partition_id); CREATE UNIQUE INDEX idx_ci_job_artifacts_on_job_id_file_type_and_partition_id_uniq ON ci_job_artifacts USING btree (job_id, file_type, partition_id); @@ -25284,6 +25282,8 @@ CREATE INDEX index_catalog_resource_versions_on_resource_id_and_released_at ON c CREATE INDEX index_catalog_resources_on_last_30_day_usage_count ON catalog_resources USING btree (last_30_day_usage_count) WHERE (state = 1); +CREATE INDEX index_catalog_resources_on_last_30_day_usage_count_updated_at ON catalog_resources USING btree (last_30_day_usage_count_updated_at); + CREATE UNIQUE INDEX index_catalog_resources_on_project_id ON catalog_resources USING btree (project_id); CREATE INDEX index_catalog_resources_on_search_vector ON catalog_resources USING gin (search_vector); diff --git a/lib/gitlab/ci/components/usages/aggregator.rb b/lib/gitlab/ci/components/usages/aggregator.rb index 18839f713dd192..ddb6121831a1cc 100644 --- a/lib/gitlab/ci/components/usages/aggregator.rb +++ b/lib/gitlab/ci/components/usages/aggregator.rb @@ -9,15 +9,20 @@ module Usages # # This aggregator is intended to be run in a scheduled cron job. It implements a "continue later" # mechanism with a Redis cursor, which enables the work to continue from where it was last interrupted - # on each run. It iterates through the target scope in batches. For each target ID, it collects the - # usage count using `distinct_each_batch` for the given usage window. We collect the count in Rails - # because the SQL query `COUNT(DISTINCT(*))` is not performant when the dataset is large. + # on each run. It iterates through the target table in batches, in order of ID ascending. For each + # target ID, it collects the usage count using `distinct_each_batch` for the given usage window. + # We collect the count in Rails because the SQL query `COUNT(DISTINCT(*))` is not performant when the + # data volume is large. # # RUNTIME: The actual total runtime will be longer than MAX_RUNTIME because # it depends on the execution time of `&usage_counts_block`. # EXCLUSIVE LEASE: This aggregator is protected from parallel processing with an exclusive lease guard. # WORKER: The worker running this service should be scheduled at the same cadence as MAX_RUNTIME, with: # deduplicate :until_executed, if_deduplicated: :reschedule_once, ttl: WORKER_DEDUP_TTL + # STOPPING: When the aggregator's cursor advances past the max target_id, it resets to 0. This means + # it may reprocess targets that have already been processed for the given usage window. + # To minimize redundant reprocessing, you should prevent the aggregator from running once it + # meets a certain stop condition (e.g. when all targets have been marked as "processed"). # ##### Usage # @@ -30,8 +35,10 @@ module Usages # `cursor_attributes`. Otherwise, returns nil. # # Example: + # return if done_processing? + # # aggregator = Gitlab::Ci::Components::Usages::Aggregator.new( - # target_scope: Ci::Catalog::Resource.scope_to_get_only_unprocessed_targets, + # target_model: Ci::Catalog::Resource, # group_by_column: :catalog_resource_id, # usage_start_date: Date.today - 30.days, # usage_end_date: Date.today - 1.day, @@ -44,19 +51,13 @@ module Usages # ##### Parameters # - # target_scope: - # - ActiveRecord relation to retrieve the target IDs. Processed in order of ID ascending. - # - The target model class should have `include EachBatch`. - # - When cursor.target_id gets reset to 0, the aggregator may reprocess targets that have - # already been processed for the given usage window. To minimize redundant reprocessing, - # add a limiting condition to the target scope so it only retrieves unprocessed targets. - # group_by_column: This should be the usage table's foreign key of the target_scope. + # target_model: Target model to iterate through. Model class should contain `include EachBatch`. + # group_by_column: This should be the usage table's foreign key of the target_model. # usage_start_date & usage_end_date: Date objects specifiying the window of usage data to aggregate. # lease_key: Used for obtaining an exclusive lease. Also used as part of the cursor Redis key. # # rubocop: disable CodeReuse/ActiveRecord -- Custom queries required for data processing class Aggregator - include Gitlab::Utils::StrongMemoize include ExclusiveLeaseGuard Result = Struct.new(:total_targets_completed, :cursor_attributes, keyword_init: true) @@ -67,15 +68,15 @@ class Aggregator WORKER_DEDUP_TTL = MAX_RUNTIME + 1.minute # Includes extra time to execute `&usage_counts_block` LEASE_TIMEOUT = 10.minutes - def initialize(target_scope:, group_by_column:, usage_start_date:, usage_end_date:, lease_key:) - @target_scope = target_scope + def initialize(target_model:, group_by_column:, usage_start_date:, usage_end_date:, lease_key:) + @target_model = target_model @group_by_column = group_by_column @lease_key = lease_key # Used by ExclusiveLeaseGuard @runtime_limiter = Gitlab::Metrics::RuntimeLimiter.new(MAX_RUNTIME) @cursor = Aggregators::Cursor.new( redis_key: "#{lease_key}:cursor", - target_scope: target_scope, + target_model: target_model, usage_window: Aggregators::Cursor::Window.new(usage_start_date, usage_end_date) ) end @@ -90,12 +91,12 @@ def each_batch(&usage_counts_block) private - attr_reader :target_scope, :group_by_column, :cursor, :runtime_limiter + attr_reader :target_model, :group_by_column, :cursor, :runtime_limiter def process_targets # Restore the scope from cursor so we can resume from the last run. `cursor.target_id` is 0 # when the Redis cursor is first initialized or when it advances past the max target ID. - restored_target_scope = target_scope.where('id >= ?', cursor.target_id) + restored_target_scope = target_model.where('id >= ?', cursor.target_id) total_targets_completed = 0 restored_target_scope.each_batch(of: TARGET_BATCH_SIZE) do |targets_relation| diff --git a/lib/gitlab/ci/components/usages/aggregators/cursor.rb b/lib/gitlab/ci/components/usages/aggregators/cursor.rb index 38711aaec264fc..d58ddf1c40cb91 100644 --- a/lib/gitlab/ci/components/usages/aggregators/cursor.rb +++ b/lib/gitlab/ci/components/usages/aggregators/cursor.rb @@ -37,9 +37,9 @@ class Cursor alias_method :interrupted?, :interrupted - def initialize(redis_key:, target_scope:, usage_window:) + def initialize(redis_key:, target_model:, usage_window:) @redis_key = redis_key - @target_scope = target_scope + @target_model = target_model @usage_window = usage_window @interrupted = false @@ -80,7 +80,7 @@ def save! private - attr_reader :redis_key, :target_scope + attr_reader :redis_key, :target_model def fetch_initial_attributes! data = Gitlab::Redis::SharedState.with do |redis| @@ -104,7 +104,7 @@ def reset_last_usage_attributes end def max_target_id - target_scope.maximum(:id).to_i + target_model.maximum(:id).to_i end strong_memoize_attr :max_target_id diff --git a/lib/gitlab/database/bulk_update.rb b/lib/gitlab/database/bulk_update.rb index 51f39419ddb3f7..aa1dd059b4d7af 100644 --- a/lib/gitlab/database/bulk_update.rb +++ b/lib/gitlab/database/bulk_update.rb @@ -29,6 +29,11 @@ module Database # values. Enums/state fields must be translated into their underlying # representations, for example, and no hooks will be called. # + # This tool does not support all column types. For example, + # ActiveModel::Type.lookup(column.type) throws an exception when + # the column type is `:timestamptz` (timestamp with time zone). + # + # module BulkUpdate LIST_SEPARATOR = ', ' diff --git a/spec/lib/gitlab/ci/components/usages/aggregator_spec.rb b/spec/lib/gitlab/ci/components/usages/aggregator_spec.rb index a71189db550361..9bdba2b1200c0d 100644 --- a/spec/lib/gitlab/ci/components/usages/aggregator_spec.rb +++ b/spec/lib/gitlab/ci/components/usages/aggregator_spec.rb @@ -11,7 +11,7 @@ let_it_be(:expected_usage_counts) { resources.zip([3, 17, 0, 1, 26]).to_h } let(:usage_model) { Ci::Catalog::Resources::Components::Usage } - let(:target_scope) { Ci::Catalog::Resource } + let(:target_model) { Ci::Catalog::Resource } let(:group_by_column) { :catalog_resource_id } let(:lease_key) { 'my_lease_key' } @@ -46,13 +46,13 @@ batched_usage_counts, result = run_new_aggregator_each_batch expect(batched_usage_counts).to eq(expected_batched_usage_counts) - expect(result.total_targets_completed).to eq(target_scope.count) + expect(result.total_targets_completed).to eq(target_model.count) expect(result.cursor_attributes).to eq({ target_id: 0, usage_window: usage_window.to_h, last_used_by_project_id: 0, last_usage_count: 0, - max_target_id: target_scope.maximum(:id).to_i + max_target_id: target_model.maximum(:id).to_i }) end end @@ -77,24 +77,24 @@ expect(batched_usage_counts).to eq([]) expect(result.total_targets_completed).to eq(0) expect(result.cursor_attributes).to eq({ - target_id: target_scope.first.id, + target_id: target_model.first.id, usage_window: usage_window.to_h, last_used_by_project_id: 2, last_usage_count: 2, - max_target_id: target_scope.maximum(:id).to_i + max_target_id: target_model.maximum(:id).to_i }) # On 2nd run, we get the complete usage count for the first catalog resource and advance the cursor batched_usage_counts, result = run_new_aggregator_each_batch - expect(batched_usage_counts).to eq([{ target_scope.first => 3 }]) + expect(batched_usage_counts).to eq([{ target_model.first => 3 }]) expect(result.total_targets_completed).to eq(1) expect(result.cursor_attributes).to eq({ - target_id: target_scope.first.id + 1, + target_id: target_model.first.id + 1, usage_window: usage_window.to_h, last_used_by_project_id: 0, last_usage_count: 0, - max_target_id: target_scope.maximum(:id).to_i + max_target_id: target_model.maximum(:id).to_i }) all_batched_usage_counts = batched_usage_counts + repeat_new_aggregator_each_batch_until_done @@ -112,14 +112,14 @@ expect(batched_usage_counts).to eq([]) expect(result.total_targets_completed).to eq(0) expect(result.cursor_attributes).to eq({ - target_id: target_scope.first.id, + target_id: target_model.first.id, usage_window: usage_window.to_h, last_used_by_project_id: 2, last_usage_count: 2, - max_target_id: target_scope.maximum(:id).to_i + max_target_id: target_model.maximum(:id).to_i }) - target_scope.first.delete + target_model.first.delete all_batched_usage_counts = repeat_new_aggregator_each_batch_until_done batched_usage_counts_merged = all_batched_usage_counts.reduce(&:merge) @@ -168,7 +168,7 @@ def run_new_aggregator_each_batch aggregator = described_class.new( - target_scope: target_scope, + target_model: target_model, group_by_column: group_by_column, usage_start_date: usage_start_date, usage_end_date: usage_end_date, diff --git a/spec/lib/gitlab/ci/components/usages/aggregators/cursor_spec.rb b/spec/lib/gitlab/ci/components/usages/aggregators/cursor_spec.rb index 30e632d8828809..6294c0576fcb3f 100644 --- a/spec/lib/gitlab/ci/components/usages/aggregators/cursor_spec.rb +++ b/spec/lib/gitlab/ci/components/usages/aggregators/cursor_spec.rb @@ -5,7 +5,7 @@ RSpec.describe Gitlab::Ci::Components::Usages::Aggregators::Cursor, :clean_gitlab_redis_shared_state, feature_category: :pipeline_composition do let(:redis_key) { 'my_redis_key:cursor' } - let(:target_scope) { class_double(Ci::Catalog::Resource, maximum: max_target_id) } + let(:target_model) { class_double(Ci::Catalog::Resource, maximum: max_target_id) } let(:max_target_id) { initial_redis_attributes[:target_id] } let(:usage_window) { described_class::Window.new(Date.parse('2024-01-08'), Date.parse('2024-01-14')) } @@ -20,7 +20,7 @@ } end - subject(:cursor) { described_class.new(redis_key: redis_key, target_scope: target_scope, usage_window: usage_window) } + subject(:cursor) { described_class.new(redis_key: redis_key, target_model: target_model, usage_window: usage_window) } before do Gitlab::Redis::SharedState.with do |redis| diff --git a/spec/services/ci/catalog/resources/aggregate_last30_day_usage_service_spec.rb b/spec/services/ci/catalog/resources/aggregate_last30_day_usage_service_spec.rb index 59b22d544bc2bd..4bc81afcdf72ee 100644 --- a/spec/services/ci/catalog/resources/aggregate_last30_day_usage_service_spec.rb +++ b/spec/services/ci/catalog/resources/aggregate_last30_day_usage_service_spec.rb @@ -6,7 +6,7 @@ feature_category: :pipeline_composition do let_it_be(:usage_start_date) { Date.today - described_class::WINDOW_LENGTH } let_it_be(:usage_end_date) { Date.today - 1.day } - let_it_be(:initial_usage_count_updated_at) { Date.yesterday.to_time } + let_it_be(:initial_usage_count_updated_at) { usage_end_date.to_time } let_it_be(:resources) { create_list(:ci_catalog_resource, 4).sort_by(&:id) } let_it_be(:expected_ordered_usage_counts) { [3, 1, 0, 15] } @@ -17,7 +17,7 @@ usage_window: usage_window_hash, last_used_by_project_id: 0, last_usage_count: 0, - max_target_id: 0 + max_target_id: Ci::Catalog::Resource.maximum(:id).to_i } end @@ -85,54 +85,19 @@ end end - context 'when some catalog resources have already been processed today' do - before_all do - resources.first(2).each do |resource| - resource.update!(last_30_day_usage_count_updated_at: Date.today.to_time) - end - end - - it 'aggregates usage data for only the unprocessed catalog resources' do - response = service.execute - - expect(response).to be_success - expect(response.payload).to eq({ - total_targets_completed: 2, - cursor_attributes: expected_cursor_attributes - }) - end - - it 'calls BulkUpdate once and updates usage counts for only the unprocessed catalog resources' do - expect(Gitlab::Database::BulkUpdate).to receive(:execute).once.and_call_original - - service.execute - - expect(ordered_usage_counts).to eq([0, 0, *expected_ordered_usage_counts.last(2)]) - expect(ordered_usage_counts_updated_at).to eq([[Date.today.to_time] * 2, [Time.current] * 2].flatten) - end - end - context 'when all catalog resources have already been processed today' do before_all do Ci::Catalog::Resource.update_all(last_30_day_usage_count_updated_at: Date.today.to_time) end it 'does not aggregate usage data' do + expect(Gitlab::Ci::Components::Usages::Aggregator).not_to receive(:new) + response = service.execute expect(response).to be_success - expect(response.payload).to eq({ - total_targets_completed: 0, - cursor_attributes: expected_cursor_attributes - }) - end - - it 'does not call BulkUpdate' do - expect(Gitlab::Database::BulkUpdate).not_to receive(:execute) - - expect { service.execute } - .to not_change { ordered_usage_counts } - .and not_change { ordered_usage_counts_updated_at } + expect(response.message).to eq("Processing complete for #{Date.today}") + expect(response.payload).to eq({}) end end end diff --git a/spec/workers/ci/catalog/resources/aggregate_last30_day_usage_worker_spec.rb b/spec/workers/ci/catalog/resources/aggregate_last30_day_usage_worker_spec.rb index 83367a23749b77..973969f58a2681 100644 --- a/spec/workers/ci/catalog/resources/aggregate_last30_day_usage_worker_spec.rb +++ b/spec/workers/ci/catalog/resources/aggregate_last30_day_usage_worker_spec.rb @@ -62,7 +62,7 @@ usage_window: usage_window_hash, last_used_by_project_id: 0, last_usage_count: 0, - max_target_id: 0 + max_target_id: Ci::Catalog::Resource.maximum(:id).to_i }) perform -- GitLab From 531468ab58b734440a5c6d50d7a5cce9395935a4 Mon Sep 17 00:00:00 2001 From: lma-git Date: Fri, 7 Jun 2024 09:50:43 -0700 Subject: [PATCH 4/4] Add test to usage service spec Re-added the case where some catalog resources are already processed for today. --- ...aggregate_last30_day_usage_service_spec.rb | 40 +++++++++++++++---- 1 file changed, 33 insertions(+), 7 deletions(-) diff --git a/spec/services/ci/catalog/resources/aggregate_last30_day_usage_service_spec.rb b/spec/services/ci/catalog/resources/aggregate_last30_day_usage_service_spec.rb index 4bc81afcdf72ee..47d2a3dff3308d 100644 --- a/spec/services/ci/catalog/resources/aggregate_last30_day_usage_service_spec.rb +++ b/spec/services/ci/catalog/resources/aggregate_last30_day_usage_service_spec.rb @@ -51,16 +51,20 @@ describe '#execute' do context 'when the aggregator is not interrupted' do - it 'aggregates usage data for all catalog resources' do - response = service.execute + shared_examples 'aggregates usage data for all catalog resources' do + it 'returns a success response' do + response = service.execute - expect(response).to be_success - expect(response.payload).to eq({ - total_targets_completed: 4, - cursor_attributes: expected_cursor_attributes - }) + expect(response).to be_success + expect(response.payload).to eq({ + total_targets_completed: 4, + cursor_attributes: expected_cursor_attributes + }) + end end + it_behaves_like 'aggregates usage data for all catalog resources' + it 'calls BulkUpdate once and updates usage counts for all catalog resources' do expect(Gitlab::Database::BulkUpdate).to receive(:execute).once.and_call_original @@ -75,6 +79,8 @@ stub_const('Gitlab::Ci::Components::Usages::Aggregator::TARGET_BATCH_SIZE', 2) end + it_behaves_like 'aggregates usage data for all catalog resources' + it 'calls BulkUpdate twice and updates usage counts for all catalog resources' do expect(Gitlab::Database::BulkUpdate).to receive(:execute).twice.and_call_original @@ -85,6 +91,26 @@ end end + context 'when some catalog resources have already been processed today' do + before_all do + resources.first(2).each do |resource| + resource.update!(last_30_day_usage_count_updated_at: Date.today.to_time) + end + end + + # The cursor has not advanced so it still processes all targets + it_behaves_like 'aggregates usage data for all catalog resources' + + it 'calls BulkUpdate once and updates usage counts for all catalog resources' do + expect(Gitlab::Database::BulkUpdate).to receive(:execute).once.and_call_original + + service.execute + + expect(ordered_usage_counts).to eq(expected_ordered_usage_counts) + expect(ordered_usage_counts_updated_at).to match_array([Time.current] * 4) + end + end + context 'when all catalog resources have already been processed today' do before_all do Ci::Catalog::Resource.update_all(last_30_day_usage_count_updated_at: Date.today.to_time) -- GitLab