diff --git a/app/models/ci/catalog/resource.rb b/app/models/ci/catalog/resource.rb index 27fa7c2b5731b40d76d860b5a0dbe49e43dc95bc..8d3d10e418d4d5f182cab4a0bbb555cf259a1c91 100644 --- a/app/models/ci/catalog/resource.rb +++ b/app/models/ci/catalog/resource.rb @@ -49,8 +49,7 @@ class Resource < ::ApplicationRecord ) end - # TODO: The usage counts will be populated by a worker that aggregates the data daily. - # See https://gitlab.com/gitlab-org/gitlab/-/issues/452545. + # The usage counts are updated daily by Ci::Catalog::Resources::AggregateLast30DayUsageWorker scope :order_by_last_30_day_usage_count_desc, -> { reorder(last_30_day_usage_count: :desc) } scope :order_by_last_30_day_usage_count_asc, -> { reorder(last_30_day_usage_count: :asc) } diff --git a/app/services/ci/catalog/resources/aggregate_last30_day_usage_service.rb b/app/services/ci/catalog/resources/aggregate_last30_day_usage_service.rb new file mode 100644 index 0000000000000000000000000000000000000000..b6cf7ed6990c76490fdd83b3b3bc4c1e7e91bc30 --- /dev/null +++ b/app/services/ci/catalog/resources/aggregate_last30_day_usage_service.rb @@ -0,0 +1,75 @@ +# frozen_string_literal: true + +module Ci + module Catalog + module Resources + # This service aggregates CI component usage data and updates `last_30_day_usage_count` for + # each catalog resource daily. It utilizes Gitlab::Ci::Components::Usages::Aggregator which + # implements a "continue later" mechanism to process the data in time-boxed jobs. + # rubocop: disable CodeReuse/ActiveRecord -- Custom queries required + class AggregateLast30DayUsageService + include Gitlab::Utils::StrongMemoize + + TARGET_MODEL = Ci::Catalog::Resource + GROUP_BY_COLUMN = :catalog_resource_id + WINDOW_LENGTH = 30.days + + def execute + return ServiceResponse.success(message: "Processing complete for #{today}") if done_processing? + + aggregator = Gitlab::Ci::Components::Usages::Aggregator.new( + target_model: TARGET_MODEL, + group_by_column: GROUP_BY_COLUMN, + usage_start_date: today - WINDOW_LENGTH, + usage_end_date: today - 1.day, + lease_key: lease_key + ) + + result = aggregator.each_batch do |usage_counts| + save_usage_counts!(usage_counts) + end + + if result + ServiceResponse.success(message: 'Targets processed', payload: result.to_h) + else + ServiceResponse.success(message: 'Lease taken', payload: { lease_key: lease_key }) + end + end + + private + + def done_processing? + min_updated_at = TARGET_MODEL.minimum(:last_30_day_usage_count_updated_at) + return true unless min_updated_at + + min_updated_at >= today.to_time + end + + def save_usage_counts!(usage_counts) + mapping = usage_counts.transform_values { |v| { last_30_day_usage_count: v } } + catalog_resource_ids = usage_counts.keys.map(&:id) + + TARGET_MODEL.transaction do + Gitlab::Database::BulkUpdate.execute(%i[last_30_day_usage_count], mapping) + + # Gitlab::Database::BulkUpdate does not support column type + # `:timestamptz` so we must update the timestamps separately. + TARGET_MODEL + .where(id: catalog_resource_ids) + .update_all(last_30_day_usage_count_updated_at: Time.current) + end + end + + def today + Date.today + end + strong_memoize_attr :today + + def lease_key + self.class.name + end + end + # rubocop: enable CodeReuse/ActiveRecord + end + end +end diff --git a/app/workers/all_queues.yml b/app/workers/all_queues.yml index d086e5ad1225e8dcd81a27c7eddbb71574c308c2..8a62d27689f4394a5e90b972e9fb71a115fe35f7 100644 --- a/app/workers/all_queues.yml +++ b/app/workers/all_queues.yml @@ -246,6 +246,15 @@ :weight: 1 :idempotent: false :tags: [] +- :name: cronjob:ci_catalog_resources_aggregate_last30_day_usage + :worker_name: Ci::Catalog::Resources::AggregateLast30DayUsageWorker + :feature_category: :pipeline_composition + :has_external_dependencies: false + :urgency: :low + :resource_boundary: :unknown + :weight: 1 + :idempotent: true + :tags: [] - :name: cronjob:ci_catalog_resources_process_sync_events :worker_name: Ci::Catalog::Resources::ProcessSyncEventsWorker :feature_category: :pipeline_composition diff --git a/app/workers/ci/catalog/resources/aggregate_last30_day_usage_worker.rb b/app/workers/ci/catalog/resources/aggregate_last30_day_usage_worker.rb new file mode 100644 index 0000000000000000000000000000000000000000..693f9363c1cb9e53ffa2a12df4f7d9910e4dd250 --- /dev/null +++ b/app/workers/ci/catalog/resources/aggregate_last30_day_usage_worker.rb @@ -0,0 +1,35 @@ +# frozen_string_literal: true + +module Ci + module Catalog + module Resources + # This worker can be called multiple times simultaneously but only one can process data at a time. + # This is ensured by an exclusive lease guard in `Gitlab::Ci::Components::Usages::Aggregator`. + # The scheduling frequency should be == `Gitlab::Ci::Components::Usages::Aggregator::MAX_RUNTIME` + # so there is no time gap between job runs. + class AggregateLast30DayUsageWorker + include ApplicationWorker + include CronjobQueue # rubocop: disable Scalability/CronWorkerContext -- Periodic processing is required + + feature_category :pipeline_composition + + data_consistency :sticky + urgency :low + idempotent! + + deduplicate :until_executed, if_deduplicated: :reschedule_once, + ttl: Gitlab::Ci::Components::Usages::Aggregator::WORKER_DEDUP_TTL + + def perform + response = Ci::Catalog::Resources::AggregateLast30DayUsageService.new.execute + + log_hash_metadata_on_done( + status: response.status, + message: response.message, + **response.payload + ) + end + end + end + end +end diff --git a/config/initializers/1_settings.rb b/config/initializers/1_settings.rb index 1a37895853c4befe6a426c32823bc4d13159cc95..819d9ae311a8dd9be216210cfd35ce083eadff63 100644 --- a/config/initializers/1_settings.rb +++ b/config/initializers/1_settings.rb @@ -707,6 +707,9 @@ Settings.cron_jobs['performance_bar_stats'] ||= {} Settings.cron_jobs['performance_bar_stats']['cron'] ||= '*/2 * * * *' Settings.cron_jobs['performance_bar_stats']['job_class'] = 'GitlabPerformanceBarStatsWorker' +Settings.cron_jobs['ci_catalog_resources_aggregate_last30_day_usage_worker'] ||= {} +Settings.cron_jobs['ci_catalog_resources_aggregate_last30_day_usage_worker']['cron'] ||= '*/4 * * * *' +Settings.cron_jobs['ci_catalog_resources_aggregate_last30_day_usage_worker']['job_class'] = 'Ci::Catalog::Resources::AggregateLast30DayUsageWorker' Gitlab.ee do Settings.cron_jobs['analytics_devops_adoption_create_all_snapshots_worker'] ||= {} diff --git a/db/post_migrate/20240602161102_index_last30_day_usage_count_updated_at_on_catalog_resources.rb b/db/post_migrate/20240602161102_index_last30_day_usage_count_updated_at_on_catalog_resources.rb new file mode 100644 index 0000000000000000000000000000000000000000..d4444dc5bb4f4fe82617ce3dce707e92bc70cfbc --- /dev/null +++ b/db/post_migrate/20240602161102_index_last30_day_usage_count_updated_at_on_catalog_resources.rb @@ -0,0 +1,17 @@ +# frozen_string_literal: true + +class IndexLast30DayUsageCountUpdatedAtOnCatalogResources < Gitlab::Database::Migration[2.2] + milestone '17.1' + + disable_ddl_transaction! + + INDEX_NAME = 'index_catalog_resources_on_last_30_day_usage_count_updated_at' + + def up + add_concurrent_index :catalog_resources, :last_30_day_usage_count_updated_at, name: INDEX_NAME + end + + def down + remove_concurrent_index_by_name :catalog_resources, INDEX_NAME + end +end diff --git a/db/post_migrate/20240602162649_change_index_p_catalog_resource_component_usages_on_catalog_resource_id.rb b/db/post_migrate/20240602162649_change_index_p_catalog_resource_component_usages_on_catalog_resource_id.rb new file mode 100644 index 0000000000000000000000000000000000000000..119026e31ba548d8e1b0b49d5feabfddcc7c0a2a --- /dev/null +++ b/db/post_migrate/20240602162649_change_index_p_catalog_resource_component_usages_on_catalog_resource_id.rb @@ -0,0 +1,26 @@ +# frozen_string_literal: true + +class ChangeIndexPCatalogResourceComponentUsagesOnCatalogResourceId < Gitlab::Database::Migration[2.2] + include Gitlab::Database::PartitioningMigrationHelpers + + milestone '17.1' + + disable_ddl_transaction! + + TABLE_NAME = :p_catalog_resource_component_usages + COLUMN_NAMES = [:catalog_resource_id, :used_by_project_id, :used_date] + INDEX_NAME = 'idx_component_usages_on_catalog_resource_used_by_proj_used_date' + + OLD_COLUMN_NAMES = [:catalog_resource_id] + OLD_INDEX_NAME = 'idx_p_catalog_resource_component_usages_on_catalog_resource_id' + + def up + add_concurrent_partitioned_index(TABLE_NAME, COLUMN_NAMES, name: INDEX_NAME) + remove_concurrent_partitioned_index_by_name(TABLE_NAME, OLD_INDEX_NAME) + end + + def down + add_concurrent_partitioned_index(TABLE_NAME, OLD_COLUMN_NAMES, name: OLD_INDEX_NAME) + remove_concurrent_partitioned_index_by_name(TABLE_NAME, INDEX_NAME) + end +end diff --git a/db/schema_migrations/20240602161102 b/db/schema_migrations/20240602161102 new file mode 100644 index 0000000000000000000000000000000000000000..78e531a05424752ce25737361c6d5bd4be039154 --- /dev/null +++ b/db/schema_migrations/20240602161102 @@ -0,0 +1 @@ +48e27a3376ea15329fed626a4839d5929affd797628e50b7c530741da92e8639 \ No newline at end of file diff --git a/db/schema_migrations/20240602162649 b/db/schema_migrations/20240602162649 new file mode 100644 index 0000000000000000000000000000000000000000..b6298826b568fa8cbe11886599b5931a5a8c22e0 --- /dev/null +++ b/db/schema_migrations/20240602162649 @@ -0,0 +1 @@ +87fd0c7f40c011772c12e74f93649bb2fa6c130da0f7a877f423099f94cebb35 \ No newline at end of file diff --git a/db/structure.sql b/db/structure.sql index 2be68f9f41fb794f4ad648da0c951077a3c60de0..7d57c87429a8b1c61a3ed8c96e00948246d75e94 100644 --- a/db/structure.sql +++ b/db/structure.sql @@ -24652,6 +24652,8 @@ CREATE INDEX idx_ci_running_builds_on_runner_type_and_owner_xid_and_id ON ci_run CREATE INDEX idx_compliance_security_policies_on_policy_configuration_id ON compliance_framework_security_policies USING btree (policy_configuration_id); +CREATE INDEX idx_component_usages_on_catalog_resource_used_by_proj_used_date ON ONLY p_catalog_resource_component_usages USING btree (catalog_resource_id, used_by_project_id, used_date); + CREATE UNIQUE INDEX idx_component_usages_on_component_used_by_project_and_used_date ON ONLY p_catalog_resource_component_usages USING btree (component_id, used_by_project_id, used_date); CREATE INDEX idx_container_exp_policies_on_project_id_next_run_at ON container_expiration_policies USING btree (project_id, next_run_at) WHERE (enabled = true); @@ -24768,8 +24770,6 @@ CREATE INDEX idx_on_protected_branch ON approval_group_rules_protected_branches CREATE INDEX idx_open_issues_on_project_and_confidential_and_author_and_id ON issues USING btree (project_id, confidential, author_id, id) WHERE (state_id = 1); -CREATE INDEX idx_p_catalog_resource_component_usages_on_catalog_resource_id ON ONLY p_catalog_resource_component_usages USING btree (catalog_resource_id); - CREATE INDEX idx_packages_debian_group_component_files_on_architecture_id ON packages_debian_group_component_files USING btree (architecture_id); CREATE INDEX idx_packages_debian_project_component_files_on_architecture_id ON packages_debian_project_component_files USING btree (architecture_id); @@ -25282,6 +25282,8 @@ CREATE INDEX index_catalog_resource_versions_on_resource_id_and_released_at ON c CREATE INDEX index_catalog_resources_on_last_30_day_usage_count ON catalog_resources USING btree (last_30_day_usage_count) WHERE (state = 1); +CREATE INDEX index_catalog_resources_on_last_30_day_usage_count_updated_at ON catalog_resources USING btree (last_30_day_usage_count_updated_at); + CREATE UNIQUE INDEX index_catalog_resources_on_project_id ON catalog_resources USING btree (project_id); CREATE INDEX index_catalog_resources_on_search_vector ON catalog_resources USING gin (search_vector); diff --git a/lib/gitlab/ci/components/usages/aggregator.rb b/lib/gitlab/ci/components/usages/aggregator.rb index 12fa36f9ec95b5fda0ca6f5dcdac4c331c782546..ddb6121831a1ccc61bf5aebbd808142fb5e89cf2 100644 --- a/lib/gitlab/ci/components/usages/aggregator.rb +++ b/lib/gitlab/ci/components/usages/aggregator.rb @@ -7,30 +7,38 @@ module Usages # Component usage is defined as the number of unique `used_by_project_id`s in the table # `p_catalog_resource_component_usages` for a given scope. # - # This aggregator iterates through the target scope in batches. For each target ID, it collects - # the usage count using `distinct_each_batch` for the given usage window. Since this process can - # be interrupted when it reaches MAX_RUNTIME, we utilize a Redis cursor so the aggregator can - # resume from where it left off on each run. We collect the count in Rails because the SQL query - # `COUNT(DISTINCT(*))` is not performant when the dataset is large. + # This aggregator is intended to be run in a scheduled cron job. It implements a "continue later" + # mechanism with a Redis cursor, which enables the work to continue from where it was last interrupted + # on each run. It iterates through the target table in batches, in order of ID ascending. For each + # target ID, it collects the usage count using `distinct_each_batch` for the given usage window. + # We collect the count in Rails because the SQL query `COUNT(DISTINCT(*))` is not performant when the + # data volume is large. # - # RUNTIME: The actual total runtime will be slightly longer than MAX_RUNTIME because + # RUNTIME: The actual total runtime will be longer than MAX_RUNTIME because # it depends on the execution time of `&usage_counts_block`. # EXCLUSIVE LEASE: This aggregator is protected from parallel processing with an exclusive lease guard. # WORKER: The worker running this service should be scheduled at the same cadence as MAX_RUNTIME, with: - # deduplicate :until_executed, if_deduplicated: :reschedule_once, ttl: LEASE_TIMEOUT + # deduplicate :until_executed, if_deduplicated: :reschedule_once, ttl: WORKER_DEDUP_TTL + # STOPPING: When the aggregator's cursor advances past the max target_id, it resets to 0. This means + # it may reprocess targets that have already been processed for the given usage window. + # To minimize redundant reprocessing, you should prevent the aggregator from running once it + # meets a certain stop condition (e.g. when all targets have been marked as "processed"). # ##### Usage # # each_batch: - # - Yields each batch of `usage_counts` to the given block. - # - The block should be able to handle targets that might be reprocessed multiple times. + # - Yields each batch of `usage_counts` to the given block. The block should: + # - Be able to handle targets that might be reprocessed multiple times. + # - Not exceed 1 minute in execution time. # - `usage_counts` format: { target_object1 => 100, target_object2 => 200, ... } - # - If the lease is obtained, returns a Result containing the `cursor` object and - # `total_targets_completed`. Otherwise, returns nil. + # - If the lease is obtained, returns a Result containing `total_targets_completed` and + # `cursor_attributes`. Otherwise, returns nil. # # Example: + # return if done_processing? + # # aggregator = Gitlab::Ci::Components::Usages::Aggregator.new( - # target_scope: Ci::Catalog::Resource.scope_to_get_only_unprocessed_targets, + # target_model: Ci::Catalog::Resource, # group_by_column: :catalog_resource_id, # usage_start_date: Date.today - 30.days, # usage_end_date: Date.today - 1.day, @@ -43,37 +51,32 @@ module Usages # ##### Parameters # - # target_scope: - # - ActiveRecord relation to retrieve the target IDs. Processed in order of ID ascending. - # - The target model class should have `include EachBatch`. - # - When cursor.target_id gets reset to 0, the aggregator may reprocess targets that have - # already been processed for the given usage window. To minimize redundant reprocessing, - # add a limiting condition to the target scope so it only retrieves unprocessed targets. - # group_by_column: This should be the usage table's foreign key of the target_scope. + # target_model: Target model to iterate through. Model class should contain `include EachBatch`. + # group_by_column: This should be the usage table's foreign key of the target_model. # usage_start_date & usage_end_date: Date objects specifiying the window of usage data to aggregate. # lease_key: Used for obtaining an exclusive lease. Also used as part of the cursor Redis key. # # rubocop: disable CodeReuse/ActiveRecord -- Custom queries required for data processing class Aggregator - include Gitlab::Utils::StrongMemoize include ExclusiveLeaseGuard - Result = Struct.new(:cursor, :total_targets_completed, keyword_init: true) + Result = Struct.new(:total_targets_completed, :cursor_attributes, keyword_init: true) TARGET_BATCH_SIZE = 1000 DISTINCT_USAGE_BATCH_SIZE = 100 MAX_RUNTIME = 4.minutes # Should be >= job scheduling frequency so there is no gap between job runs - LEASE_TIMEOUT = 5.minutes # Should be MAX_RUNTIME + extra time to execute `&usage_counts_block` + WORKER_DEDUP_TTL = MAX_RUNTIME + 1.minute # Includes extra time to execute `&usage_counts_block` + LEASE_TIMEOUT = 10.minutes - def initialize(target_scope:, group_by_column:, usage_start_date:, usage_end_date:, lease_key:) - @target_scope = target_scope + def initialize(target_model:, group_by_column:, usage_start_date:, usage_end_date:, lease_key:) + @target_model = target_model @group_by_column = group_by_column @lease_key = lease_key # Used by ExclusiveLeaseGuard @runtime_limiter = Gitlab::Metrics::RuntimeLimiter.new(MAX_RUNTIME) @cursor = Aggregators::Cursor.new( redis_key: "#{lease_key}:cursor", - target_scope: target_scope, + target_model: target_model, usage_window: Aggregators::Cursor::Window.new(usage_start_date, usage_end_date) ) end @@ -82,17 +85,18 @@ def each_batch(&usage_counts_block) try_obtain_lease do total_targets_completed = process_targets(&usage_counts_block) - Result.new(cursor: cursor, total_targets_completed: total_targets_completed) + Result.new(total_targets_completed: total_targets_completed, cursor_attributes: cursor.attributes) end end private - attr_reader :target_scope, :group_by_column, :cursor, :runtime_limiter + attr_reader :target_model, :group_by_column, :cursor, :runtime_limiter def process_targets - # Restore the scope from cursor so we can resume from the last run - restored_target_scope = target_scope.where('id >= ?', cursor.target_id) + # Restore the scope from cursor so we can resume from the last run. `cursor.target_id` is 0 + # when the Redis cursor is first initialized or when it advances past the max target ID. + restored_target_scope = target_model.where('id >= ?', cursor.target_id) total_targets_completed = 0 restored_target_scope.each_batch(of: TARGET_BATCH_SIZE) do |targets_relation| diff --git a/lib/gitlab/ci/components/usages/aggregators/cursor.rb b/lib/gitlab/ci/components/usages/aggregators/cursor.rb index b04ed03c76bdfca08f02a507d7f3bb36935c9100..d58ddf1c40cb9170efacb971532ebf89afffc4ac 100644 --- a/lib/gitlab/ci/components/usages/aggregators/cursor.rb +++ b/lib/gitlab/ci/components/usages/aggregators/cursor.rb @@ -37,9 +37,9 @@ class Cursor alias_method :interrupted?, :interrupted - def initialize(redis_key:, target_scope:, usage_window:) + def initialize(redis_key:, target_model:, usage_window:) @redis_key = redis_key - @target_scope = target_scope + @target_model = target_model @usage_window = usage_window @interrupted = false @@ -65,26 +65,22 @@ def advance def attributes { target_id: target_id, - usage_window: usage_window, + usage_window: usage_window.to_h, last_used_by_project_id: last_used_by_project_id, - last_usage_count: last_usage_count + last_usage_count: last_usage_count, + max_target_id: max_target_id } end - def max_target_id - target_scope.maximum(:id).to_i - end - strong_memoize_attr :max_target_id - def save! Gitlab::Redis::SharedState.with do |redis| - redis.set(redis_key, attributes.to_json, ex: CURSOR_REDIS_KEY_TTL) + redis.set(redis_key, attributes.except(:max_target_id).to_json, ex: CURSOR_REDIS_KEY_TTL) end end private - attr_reader :redis_key, :target_scope + attr_reader :redis_key, :target_model def fetch_initial_attributes! data = Gitlab::Redis::SharedState.with do |redis| @@ -107,6 +103,11 @@ def reset_last_usage_attributes @last_usage_count = 0 end + def max_target_id + target_model.maximum(:id).to_i + end + strong_memoize_attr :max_target_id + def parse_date(date_str) Date.parse(date_str) if date_str end diff --git a/lib/gitlab/database/bulk_update.rb b/lib/gitlab/database/bulk_update.rb index 51f39419ddb3f72a1662ecfe472be7055c59b138..aa1dd059b4d7afd5c01cea565a509e80b4eb579a 100644 --- a/lib/gitlab/database/bulk_update.rb +++ b/lib/gitlab/database/bulk_update.rb @@ -29,6 +29,11 @@ module Database # values. Enums/state fields must be translated into their underlying # representations, for example, and no hooks will be called. # + # This tool does not support all column types. For example, + # ActiveModel::Type.lookup(column.type) throws an exception when + # the column type is `:timestamptz` (timestamp with time zone). + # + # module BulkUpdate LIST_SEPARATOR = ', ' diff --git a/spec/lib/gitlab/ci/components/usages/aggregator_spec.rb b/spec/lib/gitlab/ci/components/usages/aggregator_spec.rb index f83f29939e37f563bc52698fcd7e823e86f09bd2..9bdba2b1200c0d5766c6a53cd87b629806d67d49 100644 --- a/spec/lib/gitlab/ci/components/usages/aggregator_spec.rb +++ b/spec/lib/gitlab/ci/components/usages/aggregator_spec.rb @@ -7,8 +7,11 @@ let_it_be(:usage_start_date) { Date.today - 30.days } let_it_be(:usage_end_date) { Date.today - 1.day } + let_it_be(:resources) { create_list(:ci_catalog_resource, 5).sort_by(&:id) } + let_it_be(:expected_usage_counts) { resources.zip([3, 17, 0, 1, 26]).to_h } + let(:usage_model) { Ci::Catalog::Resources::Components::Usage } - let(:target_scope) { Ci::Catalog::Resource } + let(:target_model) { Ci::Catalog::Resource } let(:group_by_column) { :catalog_resource_id } let(:lease_key) { 'my_lease_key' } @@ -17,50 +20,39 @@ end before_all do - # First catalog resource: 3 components and 3 usages per component on usage_end_date - version = create(:ci_catalog_resource_version) - create_list(:ci_catalog_resource_component, 3, version: version).each do |component| - (1..3).each do |k| - create( - :ci_catalog_resource_component_usage, - component: component, - used_date: usage_end_date, - used_by_project_id: k - ) - end - end - - # Create 4 more catalog resources, each with 1-4 components and 0-6 usages - # per component on different dates before and after usage_end_date - create_list(:ci_catalog_resource_version, 4).each_with_index do |version, i| - create_list(:ci_catalog_resource_component, i + 1, version: version).each_with_index do |component, j| - next unless j > 0 - - (1..j * 2).each do |k| - create( - :ci_catalog_resource_component_usage, - component: component, - used_date: usage_end_date - 3.days + k.days, - used_by_project_id: k - ) + # Set up each resource with 1-5 versions, 1-5 components per version, and the expected usages per component + expected_usage_counts.each_with_index do |(resource, usage_count), i| + create_list(:ci_catalog_resource_version, i + 1, catalog_resource: resource).each do |version| + (1..i + 1).each do |j| + component = create(:ci_catalog_resource_component, version: version, name: "component#{j}") + + (1..usage_count).each do |k| + # Inside the usage window + create(:ci_catalog_resource_component_usage, + component: component, used_date: usage_start_date, used_by_project_id: k) + # Outside the usage window + create(:ci_catalog_resource_component_usage, + component: component, used_date: usage_start_date - k.days, used_by_project_id: k) + end end end end end describe '#each_batch' do - shared_examples 'when the runtime limit is not reached' do + shared_examples 'when the aggregator is not interrupted' do it 'returns the expected result' do # We process all catalog resources and advance the cursor batched_usage_counts, result = run_new_aggregator_each_batch expect(batched_usage_counts).to eq(expected_batched_usage_counts) - expect(result.total_targets_completed).to eq(target_scope.count) - expect(result.cursor.attributes).to eq({ + expect(result.total_targets_completed).to eq(target_model.count) + expect(result.cursor_attributes).to eq({ target_id: 0, - usage_window: usage_window, + usage_window: usage_window.to_h, last_used_by_project_id: 0, - last_usage_count: 0 + last_usage_count: 0, + max_target_id: target_model.maximum(:id).to_i }) end end @@ -70,9 +62,9 @@ stub_const("#{described_class}::DISTINCT_USAGE_BATCH_SIZE", 2) end - it_behaves_like 'when the runtime limit is not reached' + it_behaves_like 'when the aggregator is not interrupted' - context 'when the runtime limit is reached' do + context 'when the aggregator is interrupted' do before do # Sets the aggregator to break after the first iteration on each run stub_const("#{described_class}::MAX_RUNTIME", 0) @@ -84,30 +76,32 @@ expect(batched_usage_counts).to eq([]) expect(result.total_targets_completed).to eq(0) - expect(result.cursor.attributes).to eq({ - target_id: target_scope.first.id, - usage_window: usage_window, + expect(result.cursor_attributes).to eq({ + target_id: target_model.first.id, + usage_window: usage_window.to_h, last_used_by_project_id: 2, - last_usage_count: 2 + last_usage_count: 2, + max_target_id: target_model.maximum(:id).to_i }) # On 2nd run, we get the complete usage count for the first catalog resource and advance the cursor batched_usage_counts, result = run_new_aggregator_each_batch - expect(batched_usage_counts).to eq([{ target_scope.first => 3 }]) + expect(batched_usage_counts).to eq([{ target_model.first => 3 }]) expect(result.total_targets_completed).to eq(1) - expect(result.cursor.attributes).to eq({ - target_id: target_scope.first.id + 1, - usage_window: usage_window, + expect(result.cursor_attributes).to eq({ + target_id: target_model.first.id + 1, + usage_window: usage_window.to_h, last_used_by_project_id: 0, - last_usage_count: 0 + last_usage_count: 0, + max_target_id: target_model.maximum(:id).to_i }) all_batched_usage_counts = batched_usage_counts + repeat_new_aggregator_each_batch_until_done batched_usage_counts_merged = all_batched_usage_counts.flatten.reduce(&:merge) expect(batched_usage_counts_merged.length).to eq(5) - expect(batched_usage_counts_merged).to eq(expected_batched_usage_counts_merged) + expect(batched_usage_counts_merged).to eq(expected_usage_counts) end context 'when a target is deleted between runs' do @@ -117,20 +111,21 @@ expect(batched_usage_counts).to eq([]) expect(result.total_targets_completed).to eq(0) - expect(result.cursor.attributes).to eq({ - target_id: target_scope.first.id, - usage_window: usage_window, + expect(result.cursor_attributes).to eq({ + target_id: target_model.first.id, + usage_window: usage_window.to_h, last_used_by_project_id: 2, - last_usage_count: 2 + last_usage_count: 2, + max_target_id: target_model.maximum(:id).to_i }) - target_scope.first.delete + target_model.first.delete all_batched_usage_counts = repeat_new_aggregator_each_batch_until_done batched_usage_counts_merged = all_batched_usage_counts.reduce(&:merge) expect(batched_usage_counts_merged.length).to eq(4) - expect(batched_usage_counts_merged).to eq(expected_batched_usage_counts_merged) + expect(batched_usage_counts_merged).to eq(expected_usage_counts.except(resources.first)) end end @@ -142,13 +137,13 @@ batched_usage_counts_merged = all_batched_usage_counts.reduce(&:merge) expect(batched_usage_counts_merged.length).to eq(5) - expect(batched_usage_counts_merged).to eq(expected_batched_usage_counts_merged) + expect(batched_usage_counts_merged).to eq(expected_usage_counts.transform_values { 0 }) end end end end - it_behaves_like 'when the runtime limit is not reached' + it_behaves_like 'when the aggregator is not interrupted' it_behaves_like 'with multiple distinct usage batches' context 'with multiple target batches' do @@ -156,7 +151,7 @@ stub_const("#{described_class}::TARGET_BATCH_SIZE", 3) end - it_behaves_like 'when the runtime limit is not reached' + it_behaves_like 'when the aggregator is not interrupted' it_behaves_like 'with multiple distinct usage batches' end @@ -173,7 +168,7 @@ def run_new_aggregator_each_batch aggregator = described_class.new( - target_scope: target_scope, + target_model: target_model, group_by_column: group_by_column, usage_start_date: usage_start_date, usage_end_date: usage_end_date, @@ -190,25 +185,9 @@ def run_new_aggregator_each_batch end def expected_batched_usage_counts - batched_usage_counts = [] - - target_scope.each_batch(of: described_class::TARGET_BATCH_SIZE) do |targets| - usage_counts = usage_model - .includes(:catalog_resource) - .select('catalog_resource_id, COUNT(DISTINCT used_by_project_id) AS usage_count') - .where(used_date: usage_start_date..usage_end_date) - .where(group_by_column => targets) - .group(:catalog_resource_id) - .each_with_object({}) { |r, hash| hash[r.catalog_resource] = r.usage_count } - - batched_usage_counts << targets.index_with { 0 }.merge(usage_counts) + resources.each_slice(described_class::TARGET_BATCH_SIZE).map do |batch| + expected_usage_counts.slice(*batch) end - - batched_usage_counts - end - - def expected_batched_usage_counts_merged - expected_batched_usage_counts.reduce(&:merge) end def repeat_new_aggregator_each_batch_until_done @@ -217,7 +196,7 @@ def repeat_new_aggregator_each_batch_until_done 30.times do batched_usage_counts, result = run_new_aggregator_each_batch all_batched_usage_counts << batched_usage_counts - break if result.cursor.target_id == 0 + break if result.cursor_attributes[:target_id] == 0 end all_batched_usage_counts.flatten diff --git a/spec/lib/gitlab/ci/components/usages/aggregators/cursor_spec.rb b/spec/lib/gitlab/ci/components/usages/aggregators/cursor_spec.rb index d4d416e4e586bb20cb833fc388312feef0cbe303..6294c0576fcb3ff790391c9c660b3ac399dd9ae6 100644 --- a/spec/lib/gitlab/ci/components/usages/aggregators/cursor_spec.rb +++ b/spec/lib/gitlab/ci/components/usages/aggregators/cursor_spec.rb @@ -5,7 +5,7 @@ RSpec.describe Gitlab::Ci::Components::Usages::Aggregators::Cursor, :clean_gitlab_redis_shared_state, feature_category: :pipeline_composition do let(:redis_key) { 'my_redis_key:cursor' } - let(:target_scope) { class_double(Ci::Catalog::Resource, maximum: max_target_id) } + let(:target_model) { class_double(Ci::Catalog::Resource, maximum: max_target_id) } let(:max_target_id) { initial_redis_attributes[:target_id] } let(:usage_window) { described_class::Window.new(Date.parse('2024-01-08'), Date.parse('2024-01-14')) } @@ -14,13 +14,13 @@ let(:initial_redis_attributes) do { target_id: 1, - usage_window: initial_redis_usage_window, + usage_window: initial_redis_usage_window.to_h, last_used_by_project_id: 100, last_usage_count: 10 } end - subject(:cursor) { described_class.new(redis_key: redis_key, target_scope: target_scope, usage_window: usage_window) } + subject(:cursor) { described_class.new(redis_key: redis_key, target_model: target_model, usage_window: usage_window) } before do Gitlab::Redis::SharedState.with do |redis| @@ -30,7 +30,7 @@ describe '.new' do it 'fetches and parses the attributes from Redis' do - expect(cursor.attributes).to eq(initial_redis_attributes) + expect(cursor.attributes).to include(initial_redis_attributes) end context 'when Redis usage_window is different than the given usage_window' do @@ -39,9 +39,9 @@ end it 'resets last usage attributes' do - expect(cursor.attributes).to eq({ + expect(cursor.attributes).to include({ target_id: initial_redis_attributes[:target_id], - usage_window: usage_window, + usage_window: usage_window.to_h, last_used_by_project_id: 0, last_usage_count: 0 }) @@ -56,9 +56,9 @@ end it 'sets target_id and last usage attributes to zero' do - expect(cursor.attributes).to eq({ + expect(cursor.attributes).to include({ target_id: 0, - usage_window: usage_window, + usage_window: usage_window.to_h, last_used_by_project_id: 0, last_usage_count: 0 }) @@ -76,9 +76,9 @@ ) expect(cursor.interrupted?).to eq(true) - expect(cursor.attributes).to eq({ + expect(cursor.attributes).to include({ target_id: initial_redis_attributes[:target_id], - usage_window: usage_window, + usage_window: usage_window.to_h, last_used_by_project_id: initial_redis_attributes[:last_used_by_project_id] + 1, last_usage_count: initial_redis_attributes[:last_usage_count] + 1 }) @@ -90,9 +90,9 @@ it 'sets new target_id and resets last usage attributes' do cursor.target_id = initial_redis_attributes[:target_id] + 1 - expect(cursor.attributes).to eq({ + expect(cursor.attributes).to include({ target_id: initial_redis_attributes[:target_id] + 1, - usage_window: usage_window, + usage_window: usage_window.to_h, last_used_by_project_id: 0, last_usage_count: 0 }) @@ -101,7 +101,7 @@ context 'when new target_id is the same as cursor target_id' do it 'does not change cursor attributes' do - expect(cursor.attributes).to eq(initial_redis_attributes) + expect(cursor.attributes).to include(initial_redis_attributes) end end end @@ -115,9 +115,10 @@ expect(cursor.attributes).to eq({ target_id: initial_redis_attributes[:target_id] + 1, - usage_window: usage_window, + usage_window: usage_window.to_h, last_used_by_project_id: 0, - last_usage_count: 0 + last_usage_count: 0, + max_target_id: max_target_id }) end end @@ -128,29 +129,17 @@ expect(cursor.attributes).to eq({ target_id: 0, - usage_window: usage_window, + usage_window: usage_window.to_h, last_used_by_project_id: 0, - last_usage_count: 0 + last_usage_count: 0, + max_target_id: max_target_id }) end end end - describe '#max_target_id' do - let(:target_scope) { Ci::Catalog::Resource } - - before_all do - create(:ci_catalog_resource, id: 123) - create(:ci_catalog_resource, id: 100) - end - - it 'returns maximum ID of the target scope' do - expect(cursor.max_target_id).to eq(123) - end - end - describe '#save!' do - it 'saves cursor attributes to Redis as JSON' do + it 'saves cursor attributes except max_target_id to Redis as JSON' do cursor.target_id = 11 cursor.interrupt!( last_used_by_project_id: 33, diff --git a/spec/services/ci/catalog/resources/aggregate_last30_day_usage_service_spec.rb b/spec/services/ci/catalog/resources/aggregate_last30_day_usage_service_spec.rb new file mode 100644 index 0000000000000000000000000000000000000000..47d2a3dff3308d24bc2749a17fe3f8d22456b9be --- /dev/null +++ b/spec/services/ci/catalog/resources/aggregate_last30_day_usage_service_spec.rb @@ -0,0 +1,183 @@ +# frozen_string_literal: true + +require 'spec_helper' + +RSpec.describe Ci::Catalog::Resources::AggregateLast30DayUsageService, :clean_gitlab_redis_shared_state, :freeze_time, + feature_category: :pipeline_composition do + let_it_be(:usage_start_date) { Date.today - described_class::WINDOW_LENGTH } + let_it_be(:usage_end_date) { Date.today - 1.day } + let_it_be(:initial_usage_count_updated_at) { usage_end_date.to_time } + + let_it_be(:resources) { create_list(:ci_catalog_resource, 4).sort_by(&:id) } + let_it_be(:expected_ordered_usage_counts) { [3, 1, 0, 15] } + + let(:expected_cursor_attributes) do + { + target_id: 0, + usage_window: usage_window_hash, + last_used_by_project_id: 0, + last_usage_count: 0, + max_target_id: Ci::Catalog::Resource.maximum(:id).to_i + } + end + + let(:usage_window_hash) { { start_date: usage_start_date, end_date: usage_end_date } } + let(:lease_key) { described_class.name } + let(:service) { described_class.new } + + before_all do + # Set up each resource with 1-4 versions, 1-4 components per version, and the expected usages per component + expected_ordered_usage_counts.each_with_index do |usage_count, i| + resource = resources[i] + + create_list(:ci_catalog_resource_version, i + 1, catalog_resource: resource).each do |version| + (1..i + 1).each do |j| + component = create(:ci_catalog_resource_component, version: version, name: "component#{j}") + + (1..usage_count).each do |k| + # Inside the usage window + create(:ci_catalog_resource_component_usage, + component: component, used_date: usage_start_date, used_by_project_id: k) + # Outside the usage window + create(:ci_catalog_resource_component_usage, + component: component, used_date: usage_start_date - k.days, used_by_project_id: k) + end + end + end + end + + Ci::Catalog::Resource.update_all(last_30_day_usage_count_updated_at: initial_usage_count_updated_at) + end + + describe '#execute' do + context 'when the aggregator is not interrupted' do + shared_examples 'aggregates usage data for all catalog resources' do + it 'returns a success response' do + response = service.execute + + expect(response).to be_success + expect(response.payload).to eq({ + total_targets_completed: 4, + cursor_attributes: expected_cursor_attributes + }) + end + end + + it_behaves_like 'aggregates usage data for all catalog resources' + + it 'calls BulkUpdate once and updates usage counts for all catalog resources' do + expect(Gitlab::Database::BulkUpdate).to receive(:execute).once.and_call_original + + service.execute + + expect(ordered_usage_counts).to eq(expected_ordered_usage_counts) + expect(ordered_usage_counts_updated_at).to match_array([Time.current] * 4) + end + + context 'when there are two batches of usage counts' do + before do + stub_const('Gitlab::Ci::Components::Usages::Aggregator::TARGET_BATCH_SIZE', 2) + end + + it_behaves_like 'aggregates usage data for all catalog resources' + + it 'calls BulkUpdate twice and updates usage counts for all catalog resources' do + expect(Gitlab::Database::BulkUpdate).to receive(:execute).twice.and_call_original + + service.execute + + expect(ordered_usage_counts).to eq(expected_ordered_usage_counts) + expect(ordered_usage_counts_updated_at).to match_array([Time.current] * 4) + end + end + + context 'when some catalog resources have already been processed today' do + before_all do + resources.first(2).each do |resource| + resource.update!(last_30_day_usage_count_updated_at: Date.today.to_time) + end + end + + # The cursor has not advanced so it still processes all targets + it_behaves_like 'aggregates usage data for all catalog resources' + + it 'calls BulkUpdate once and updates usage counts for all catalog resources' do + expect(Gitlab::Database::BulkUpdate).to receive(:execute).once.and_call_original + + service.execute + + expect(ordered_usage_counts).to eq(expected_ordered_usage_counts) + expect(ordered_usage_counts_updated_at).to match_array([Time.current] * 4) + end + end + + context 'when all catalog resources have already been processed today' do + before_all do + Ci::Catalog::Resource.update_all(last_30_day_usage_count_updated_at: Date.today.to_time) + end + + it 'does not aggregate usage data' do + expect(Gitlab::Ci::Components::Usages::Aggregator).not_to receive(:new) + + response = service.execute + + expect(response).to be_success + expect(response.message).to eq("Processing complete for #{Date.today}") + expect(response.payload).to eq({}) + end + end + end + + context 'when the aggregator is interrupted' do + before do + # Sets the aggregator to break after the first iteration on each run + stub_const('Gitlab::Ci::Components::Usages::Aggregator::MAX_RUNTIME', 0) + stub_const('Gitlab::Ci::Components::Usages::Aggregator::DISTINCT_USAGE_BATCH_SIZE', 2) + end + + it 'updates the expected usage counts for each run' do + # On 1st run, we get an incomplete usage count for the first catalog resource so it is not saved + expect { service.execute } + .to not_change { ordered_usage_counts } + .and not_change { ordered_usage_counts_updated_at } + + # On 2nd run, we get the complete usage count for the first catalog resource and save it + service.execute + + expect(ordered_usage_counts).to eq([expected_ordered_usage_counts.first, 0, 0, 0]) + expect(ordered_usage_counts_updated_at).to eq([Time.current, [initial_usage_count_updated_at] * 3].flatten) + + # Execute service repeatedly until done + 30.times do + response = service.execute + break if response.payload[:cursor_attributes][:target_id] == 0 + end + + expect(ordered_usage_counts).to eq(expected_ordered_usage_counts) + expect(ordered_usage_counts_updated_at).to match_array([Time.current] * 4) + end + end + + context 'when another instance is running with the same lease key' do + it 'returns a success response with the lease key' do + lease = Gitlab::ExclusiveLease.new(lease_key, timeout: 1.minute).tap(&:try_obtain) + response = service.execute + + expect(response).to be_success + expect(response.message).to eq('Lease taken') + expect(response.payload).to eq({ lease_key: lease_key }) + lease.cancel + end + end + end + + private + + def ordered_usage_counts + Ci::Catalog::Resource.order(:id).pluck(:last_30_day_usage_count) + end + + def ordered_usage_counts_updated_at + Ci::Catalog::Resource.order(:id).pluck(:last_30_day_usage_count_updated_at) + end +end diff --git a/spec/workers/ci/catalog/resources/aggregate_last30_day_usage_worker_spec.rb b/spec/workers/ci/catalog/resources/aggregate_last30_day_usage_worker_spec.rb new file mode 100644 index 0000000000000000000000000000000000000000..973969f58a268194629f44360b596eaf2ab9f1be --- /dev/null +++ b/spec/workers/ci/catalog/resources/aggregate_last30_day_usage_worker_spec.rb @@ -0,0 +1,71 @@ +# frozen_string_literal: true + +require 'spec_helper' + +RSpec.describe Ci::Catalog::Resources::AggregateLast30DayUsageWorker, feature_category: :pipeline_composition do + subject(:worker) { described_class.new } + + include_examples 'an idempotent worker' + + it 'has the `until_executed` deduplicate strategy' do + expect(described_class.get_deduplicate_strategy).to eq(:until_executed) + end + + it 'has the option to reschedule once if deduplicated and a TTL' do + expect(described_class.get_deduplication_options).to include( + { if_deduplicated: :reschedule_once, ttl: Gitlab::Ci::Components::Usages::Aggregator::WORKER_DEDUP_TTL }) + end + + describe '#perform', :clean_gitlab_redis_shared_state, :freeze_time do + let_it_be(:usage_start_date) { Date.today - Ci::Catalog::Resources::AggregateLast30DayUsageService::WINDOW_LENGTH } + let_it_be(:usage_end_date) { Date.today - 1.day } + + let_it_be(:resources) { create_list(:ci_catalog_resource, 3).sort_by(&:id) } + let_it_be(:expected_ordered_usage_counts) { [7, 12, 0] } + + let(:usage_window_hash) { { start_date: usage_start_date, end_date: usage_end_date } } + + subject(:perform) { worker.perform } + + before_all do + # Set up each resource with 1 version and 1 component, and the expected usages per component + expected_ordered_usage_counts.each_with_index do |usage_count, i| + resource = resources[i] + version = create(:ci_catalog_resource_version, catalog_resource: resource) + component = create(:ci_catalog_resource_component, version: version) + + (1..usage_count).each do |k| + create(:ci_catalog_resource_component_usage, + component: component, used_date: usage_start_date, used_by_project_id: k) + end + end + end + + it 'aggregates and updates usage counts for all catalog resources' do + perform + + ordered_usage_counts = Ci::Catalog::Resource.order(:id).pluck(:last_30_day_usage_count) + ordered_usage_counts_updated_at = Ci::Catalog::Resource.order(:id).pluck(:last_30_day_usage_count_updated_at) + + expect(ordered_usage_counts).to eq(expected_ordered_usage_counts) + expect(ordered_usage_counts_updated_at).to match_array([Time.current] * 3) + end + + it 'logs the service response' do + expect(worker).to receive(:log_hash_metadata_on_done) + .with( + status: :success, + message: 'Targets processed', + total_targets_completed: 3, + cursor_attributes: { + target_id: 0, + usage_window: usage_window_hash, + last_used_by_project_id: 0, + last_usage_count: 0, + max_target_id: Ci::Catalog::Resource.maximum(:id).to_i + }) + + perform + end + end +end