diff --git a/app/models/ci/catalog/resource.rb b/app/models/ci/catalog/resource.rb index b50aac09cedb383597429f9f1ecbfb4f135b118d..77c5c61bb90996590069128a6504fd6a476452eb 100644 --- a/app/models/ci/catalog/resource.rb +++ b/app/models/ci/catalog/resource.rb @@ -11,6 +11,7 @@ class Resource < ::ApplicationRecord include PgFullTextSearchable include Gitlab::VisibilityLevel include Sortable + include EachBatch self.table_name = 'catalog_resources' diff --git a/app/models/ci/catalog/resources/components/usage.rb b/app/models/ci/catalog/resources/components/usage.rb index b721e57d0a75eadb56aa380555ce346b979dfc0c..07d785728e2b8f3c6c56f0a14deeb0c622e05f17 100644 --- a/app/models/ci/catalog/resources/components/usage.rb +++ b/app/models/ci/catalog/resources/components/usage.rb @@ -9,6 +9,7 @@ module Components # to preserve historical usage data. class Usage < ::ApplicationRecord include PartitionedTable + include EachBatch self.table_name = 'p_catalog_resource_component_usages' self.primary_key = :id diff --git a/lib/gitlab/ci/components/usages/aggregator.rb b/lib/gitlab/ci/components/usages/aggregator.rb new file mode 100644 index 0000000000000000000000000000000000000000..12fa36f9ec95b5fda0ca6f5dcdac4c331c782546 --- /dev/null +++ b/lib/gitlab/ci/components/usages/aggregator.rb @@ -0,0 +1,162 @@ +# frozen_string_literal: true + +module Gitlab + module Ci + module Components + module Usages + # Component usage is defined as the number of unique `used_by_project_id`s in the table + # `p_catalog_resource_component_usages` for a given scope. + # + # This aggregator iterates through the target scope in batches. For each target ID, it collects + # the usage count using `distinct_each_batch` for the given usage window. Since this process can + # be interrupted when it reaches MAX_RUNTIME, we utilize a Redis cursor so the aggregator can + # resume from where it left off on each run. We collect the count in Rails because the SQL query + # `COUNT(DISTINCT(*))` is not performant when the dataset is large. + # + # RUNTIME: The actual total runtime will be slightly longer than MAX_RUNTIME because + # it depends on the execution time of `&usage_counts_block`. + # EXCLUSIVE LEASE: This aggregator is protected from parallel processing with an exclusive lease guard. + # WORKER: The worker running this service should be scheduled at the same cadence as MAX_RUNTIME, with: + # deduplicate :until_executed, if_deduplicated: :reschedule_once, ttl: LEASE_TIMEOUT + # + ##### Usage + # + # each_batch: + # - Yields each batch of `usage_counts` to the given block. + # - The block should be able to handle targets that might be reprocessed multiple times. + # - `usage_counts` format: { target_object1 => 100, target_object2 => 200, ... } + # - If the lease is obtained, returns a Result containing the `cursor` object and + # `total_targets_completed`. Otherwise, returns nil. + # + # Example: + # aggregator = Gitlab::Ci::Components::Usages::Aggregator.new( + # target_scope: Ci::Catalog::Resource.scope_to_get_only_unprocessed_targets, + # group_by_column: :catalog_resource_id, + # usage_start_date: Date.today - 30.days, + # usage_end_date: Date.today - 1.day, + # lease_key: 'my_aggregator_service_lease_key' + # ) + # + # result = aggregator.each_batch do |usage_counts| + # # Bulk update usage counts in the database + # end + # + ##### Parameters + # + # target_scope: + # - ActiveRecord relation to retrieve the target IDs. Processed in order of ID ascending. + # - The target model class should have `include EachBatch`. + # - When cursor.target_id gets reset to 0, the aggregator may reprocess targets that have + # already been processed for the given usage window. To minimize redundant reprocessing, + # add a limiting condition to the target scope so it only retrieves unprocessed targets. + # group_by_column: This should be the usage table's foreign key of the target_scope. + # usage_start_date & usage_end_date: Date objects specifiying the window of usage data to aggregate. + # lease_key: Used for obtaining an exclusive lease. Also used as part of the cursor Redis key. + # + # rubocop: disable CodeReuse/ActiveRecord -- Custom queries required for data processing + class Aggregator + include Gitlab::Utils::StrongMemoize + include ExclusiveLeaseGuard + + Result = Struct.new(:cursor, :total_targets_completed, keyword_init: true) + + TARGET_BATCH_SIZE = 1000 + DISTINCT_USAGE_BATCH_SIZE = 100 + MAX_RUNTIME = 4.minutes # Should be >= job scheduling frequency so there is no gap between job runs + LEASE_TIMEOUT = 5.minutes # Should be MAX_RUNTIME + extra time to execute `&usage_counts_block` + + def initialize(target_scope:, group_by_column:, usage_start_date:, usage_end_date:, lease_key:) + @target_scope = target_scope + @group_by_column = group_by_column + @lease_key = lease_key # Used by ExclusiveLeaseGuard + @runtime_limiter = Gitlab::Metrics::RuntimeLimiter.new(MAX_RUNTIME) + + @cursor = Aggregators::Cursor.new( + redis_key: "#{lease_key}:cursor", + target_scope: target_scope, + usage_window: Aggregators::Cursor::Window.new(usage_start_date, usage_end_date) + ) + end + + def each_batch(&usage_counts_block) + try_obtain_lease do + total_targets_completed = process_targets(&usage_counts_block) + + Result.new(cursor: cursor, total_targets_completed: total_targets_completed) + end + end + + private + + attr_reader :target_scope, :group_by_column, :cursor, :runtime_limiter + + def process_targets + # Restore the scope from cursor so we can resume from the last run + restored_target_scope = target_scope.where('id >= ?', cursor.target_id) + total_targets_completed = 0 + + restored_target_scope.each_batch(of: TARGET_BATCH_SIZE) do |targets_relation| + usage_counts = aggregate_usage_counts(targets_relation) + + yield usage_counts if usage_counts.present? + + total_targets_completed += usage_counts.length + break if runtime_limiter.over_time? + end + + cursor.advance unless cursor.interrupted? + cursor.save! + + total_targets_completed + end + + def aggregate_usage_counts(targets_relation) + usage_counts = {} + + targets_relation.order(:id).each do |target| + # When target.id is different from the cursor's target_id, it + # resets last_usage_count and last_used_by_project_id to 0. + cursor.target_id = target.id + + usage_scope = ::Ci::Catalog::Resources::Components::Usage + .where(group_by_column => cursor.target_id) + .where(used_date: cursor.usage_window.start_date..cursor.usage_window.end_date) + + # Restore the scope from cursor so we can resume from the last run if interrupted + restored_usage_scope = usage_scope.where('used_by_project_id > ?', cursor.last_used_by_project_id) + usage_counts[target] = cursor.last_usage_count + + restored_usage_scope + .distinct_each_batch(column: :used_by_project_id, of: DISTINCT_USAGE_BATCH_SIZE) do |usages_relation| + count = usages_relation.count + usage_counts[target] += count + + # If we're over time and count == batch size, it means there is likely another batch + # to process for the current target, so the usage count is incomplete. We store the + # last used_by_project_id and count so that we can resume counting on the next run. + if runtime_limiter.over_time? && count == DISTINCT_USAGE_BATCH_SIZE + cursor.interrupt!( + last_used_by_project_id: usages_relation.maximum(:used_by_project_id).to_i, + last_usage_count: usage_counts[target] + ) + + usage_counts.delete(target) # Remove the incomplete count + break + end + end + + break if runtime_limiter.over_time? + end + + usage_counts + end + + def lease_timeout + LEASE_TIMEOUT + end + end + # rubocop: enable CodeReuse/ActiveRecord + end + end + end +end diff --git a/lib/gitlab/ci/components/usages/aggregators/cursor.rb b/lib/gitlab/ci/components/usages/aggregators/cursor.rb new file mode 100644 index 0000000000000000000000000000000000000000..b04ed03c76bdfca08f02a507d7f3bb36935c9100 --- /dev/null +++ b/lib/gitlab/ci/components/usages/aggregators/cursor.rb @@ -0,0 +1,118 @@ +# frozen_string_literal: true + +module Gitlab + module Ci + module Components + module Usages + module Aggregators + # This class represents a Redis cursor that keeps track of the data processing + # position and progression in Gitlab::Ci::Components::Usages::Aggregator. It + # updates and saves the attributes necessary for the aggregation to resume + # from where it was interrupted on its last run. + # + # The cursor's target_id is reset to 0 under these circumstances: + # 1. When the Redis cursor is first initialized. + # 2. When the Redis cursor expires or is lost and must be re-initialized. + # 3. When the cursor advances past max_target_id. + # + ##### Attributes + # + # target_id: The target ID from which to resume aggregating the usage counts. + # usage_window: The window of usage data to aggregate. + # last_used_by_project_id: The last used_by_project_id that was counted before interruption. + # last_usage_count: The last usage_count that was recorded before interruption. + # + # The last_used_by_project_id and last_usage_count only pertain to the exact target_id + # and usage_window that was saved before interruption. If either of the latter attributes + # change, then we reset the last_* values to 0. + # + class Cursor + include Gitlab::Utils::StrongMemoize + + Window = Struct.new(:start_date, :end_date) + + CURSOR_REDIS_KEY_TTL = 7.days + + attr_reader :target_id, :usage_window, :last_used_by_project_id, :last_usage_count, :interrupted + + alias_method :interrupted?, :interrupted + + def initialize(redis_key:, target_scope:, usage_window:) + @redis_key = redis_key + @target_scope = target_scope + @usage_window = usage_window + @interrupted = false + + fetch_initial_attributes! + end + + def interrupt!(last_used_by_project_id:, last_usage_count:) + @last_used_by_project_id = last_used_by_project_id + @last_usage_count = last_usage_count + @interrupted = true + end + + def target_id=(target_id) + reset_last_usage_attributes if target_id != self.target_id + @target_id = target_id + end + + def advance + self.target_id += 1 + self.target_id = 0 if target_id > max_target_id + end + + def attributes + { + target_id: target_id, + usage_window: usage_window, + last_used_by_project_id: last_used_by_project_id, + last_usage_count: last_usage_count + } + end + + def max_target_id + target_scope.maximum(:id).to_i + end + strong_memoize_attr :max_target_id + + def save! + Gitlab::Redis::SharedState.with do |redis| + redis.set(redis_key, attributes.to_json, ex: CURSOR_REDIS_KEY_TTL) + end + end + + private + + attr_reader :redis_key, :target_scope + + def fetch_initial_attributes! + data = Gitlab::Redis::SharedState.with do |redis| + raw = redis.get(redis_key) + raw.present? ? Gitlab::Json.parse(raw) : {} + end.with_indifferent_access + + start_date = parse_date(data.dig(:usage_window, :start_date)) + end_date = parse_date(data.dig(:usage_window, :end_date)) + + @target_id = data[:target_id].to_i + @last_used_by_project_id = data[:last_used_by_project_id].to_i + @last_usage_count = data[:last_usage_count].to_i + + reset_last_usage_attributes if usage_window != Window.new(start_date, end_date) + end + + def reset_last_usage_attributes + @last_used_by_project_id = 0 + @last_usage_count = 0 + end + + def parse_date(date_str) + Date.parse(date_str) if date_str + end + end + end + end + end + end +end diff --git a/spec/lib/gitlab/ci/components/usages/aggregator_spec.rb b/spec/lib/gitlab/ci/components/usages/aggregator_spec.rb new file mode 100644 index 0000000000000000000000000000000000000000..f83f29939e37f563bc52698fcd7e823e86f09bd2 --- /dev/null +++ b/spec/lib/gitlab/ci/components/usages/aggregator_spec.rb @@ -0,0 +1,225 @@ +# frozen_string_literal: true + +require 'spec_helper' + +RSpec.describe Gitlab::Ci::Components::Usages::Aggregator, :clean_gitlab_redis_shared_state, :freeze_time, + feature_category: :pipeline_composition do + let_it_be(:usage_start_date) { Date.today - 30.days } + let_it_be(:usage_end_date) { Date.today - 1.day } + + let(:usage_model) { Ci::Catalog::Resources::Components::Usage } + let(:target_scope) { Ci::Catalog::Resource } + let(:group_by_column) { :catalog_resource_id } + let(:lease_key) { 'my_lease_key' } + + let(:usage_window) do + Gitlab::Ci::Components::Usages::Aggregators::Cursor::Window.new(usage_start_date, usage_end_date) + end + + before_all do + # First catalog resource: 3 components and 3 usages per component on usage_end_date + version = create(:ci_catalog_resource_version) + create_list(:ci_catalog_resource_component, 3, version: version).each do |component| + (1..3).each do |k| + create( + :ci_catalog_resource_component_usage, + component: component, + used_date: usage_end_date, + used_by_project_id: k + ) + end + end + + # Create 4 more catalog resources, each with 1-4 components and 0-6 usages + # per component on different dates before and after usage_end_date + create_list(:ci_catalog_resource_version, 4).each_with_index do |version, i| + create_list(:ci_catalog_resource_component, i + 1, version: version).each_with_index do |component, j| + next unless j > 0 + + (1..j * 2).each do |k| + create( + :ci_catalog_resource_component_usage, + component: component, + used_date: usage_end_date - 3.days + k.days, + used_by_project_id: k + ) + end + end + end + end + + describe '#each_batch' do + shared_examples 'when the runtime limit is not reached' do + it 'returns the expected result' do + # We process all catalog resources and advance the cursor + batched_usage_counts, result = run_new_aggregator_each_batch + + expect(batched_usage_counts).to eq(expected_batched_usage_counts) + expect(result.total_targets_completed).to eq(target_scope.count) + expect(result.cursor.attributes).to eq({ + target_id: 0, + usage_window: usage_window, + last_used_by_project_id: 0, + last_usage_count: 0 + }) + end + end + + shared_examples 'with multiple distinct usage batches' do + before do + stub_const("#{described_class}::DISTINCT_USAGE_BATCH_SIZE", 2) + end + + it_behaves_like 'when the runtime limit is not reached' + + context 'when the runtime limit is reached' do + before do + # Sets the aggregator to break after the first iteration on each run + stub_const("#{described_class}::MAX_RUNTIME", 0) + end + + it 'returns the expected result for each run' do + # On 1st run, we get an incomplete usage count for the first catalog resource + batched_usage_counts, result = run_new_aggregator_each_batch + + expect(batched_usage_counts).to eq([]) + expect(result.total_targets_completed).to eq(0) + expect(result.cursor.attributes).to eq({ + target_id: target_scope.first.id, + usage_window: usage_window, + last_used_by_project_id: 2, + last_usage_count: 2 + }) + + # On 2nd run, we get the complete usage count for the first catalog resource and advance the cursor + batched_usage_counts, result = run_new_aggregator_each_batch + + expect(batched_usage_counts).to eq([{ target_scope.first => 3 }]) + expect(result.total_targets_completed).to eq(1) + expect(result.cursor.attributes).to eq({ + target_id: target_scope.first.id + 1, + usage_window: usage_window, + last_used_by_project_id: 0, + last_usage_count: 0 + }) + + all_batched_usage_counts = batched_usage_counts + repeat_new_aggregator_each_batch_until_done + batched_usage_counts_merged = all_batched_usage_counts.flatten.reduce(&:merge) + + expect(batched_usage_counts_merged.length).to eq(5) + expect(batched_usage_counts_merged).to eq(expected_batched_usage_counts_merged) + end + + context 'when a target is deleted between runs' do + it 'returns the expected result for each run' do + # On 1st run, we get an incomplete usage count for the first catalog resource + batched_usage_counts, result = run_new_aggregator_each_batch + + expect(batched_usage_counts).to eq([]) + expect(result.total_targets_completed).to eq(0) + expect(result.cursor.attributes).to eq({ + target_id: target_scope.first.id, + usage_window: usage_window, + last_used_by_project_id: 2, + last_usage_count: 2 + }) + + target_scope.first.delete + + all_batched_usage_counts = repeat_new_aggregator_each_batch_until_done + batched_usage_counts_merged = all_batched_usage_counts.reduce(&:merge) + + expect(batched_usage_counts_merged.length).to eq(4) + expect(batched_usage_counts_merged).to eq(expected_batched_usage_counts_merged) + end + end + + context 'when there are no usage records' do + it 'returns the expected result' do + usage_model.delete_all + + all_batched_usage_counts = repeat_new_aggregator_each_batch_until_done + batched_usage_counts_merged = all_batched_usage_counts.reduce(&:merge) + + expect(batched_usage_counts_merged.length).to eq(5) + expect(batched_usage_counts_merged).to eq(expected_batched_usage_counts_merged) + end + end + end + end + + it_behaves_like 'when the runtime limit is not reached' + it_behaves_like 'with multiple distinct usage batches' + + context 'with multiple target batches' do + before do + stub_const("#{described_class}::TARGET_BATCH_SIZE", 3) + end + + it_behaves_like 'when the runtime limit is not reached' + it_behaves_like 'with multiple distinct usage batches' + end + + it 'prevents parallel processing with an exclusive lease guard' do + lease = Gitlab::ExclusiveLease.new(lease_key, timeout: 1.minute).tap(&:try_obtain) + result = run_new_aggregator_each_batch.last + + expect(result).to be_nil + lease.cancel + end + end + + private + + def run_new_aggregator_each_batch + aggregator = described_class.new( + target_scope: target_scope, + group_by_column: group_by_column, + usage_start_date: usage_start_date, + usage_end_date: usage_end_date, + lease_key: lease_key + ) + + batched_usage_counts = [] + + result = aggregator.each_batch do |usage_counts| + batched_usage_counts << usage_counts + end + + [batched_usage_counts, result] + end + + def expected_batched_usage_counts + batched_usage_counts = [] + + target_scope.each_batch(of: described_class::TARGET_BATCH_SIZE) do |targets| + usage_counts = usage_model + .includes(:catalog_resource) + .select('catalog_resource_id, COUNT(DISTINCT used_by_project_id) AS usage_count') + .where(used_date: usage_start_date..usage_end_date) + .where(group_by_column => targets) + .group(:catalog_resource_id) + .each_with_object({}) { |r, hash| hash[r.catalog_resource] = r.usage_count } + + batched_usage_counts << targets.index_with { 0 }.merge(usage_counts) + end + + batched_usage_counts + end + + def expected_batched_usage_counts_merged + expected_batched_usage_counts.reduce(&:merge) + end + + def repeat_new_aggregator_each_batch_until_done + all_batched_usage_counts = [] + + 30.times do + batched_usage_counts, result = run_new_aggregator_each_batch + all_batched_usage_counts << batched_usage_counts + break if result.cursor.target_id == 0 + end + + all_batched_usage_counts.flatten + end +end diff --git a/spec/lib/gitlab/ci/components/usages/aggregators/cursor_spec.rb b/spec/lib/gitlab/ci/components/usages/aggregators/cursor_spec.rb new file mode 100644 index 0000000000000000000000000000000000000000..d4d416e4e586bb20cb833fc388312feef0cbe303 --- /dev/null +++ b/spec/lib/gitlab/ci/components/usages/aggregators/cursor_spec.rb @@ -0,0 +1,167 @@ +# frozen_string_literal: true + +require 'spec_helper' + +RSpec.describe Gitlab::Ci::Components::Usages::Aggregators::Cursor, :clean_gitlab_redis_shared_state, + feature_category: :pipeline_composition do + let(:redis_key) { 'my_redis_key:cursor' } + let(:target_scope) { class_double(Ci::Catalog::Resource, maximum: max_target_id) } + let(:max_target_id) { initial_redis_attributes[:target_id] } + + let(:usage_window) { described_class::Window.new(Date.parse('2024-01-08'), Date.parse('2024-01-14')) } + let(:initial_redis_usage_window) { usage_window } + + let(:initial_redis_attributes) do + { + target_id: 1, + usage_window: initial_redis_usage_window, + last_used_by_project_id: 100, + last_usage_count: 10 + } + end + + subject(:cursor) { described_class.new(redis_key: redis_key, target_scope: target_scope, usage_window: usage_window) } + + before do + Gitlab::Redis::SharedState.with do |redis| + redis.set(redis_key, initial_redis_attributes.to_json) + end + end + + describe '.new' do + it 'fetches and parses the attributes from Redis' do + expect(cursor.attributes).to eq(initial_redis_attributes) + end + + context 'when Redis usage_window is different than the given usage_window' do + let(:initial_redis_usage_window) do + described_class::Window.new(Date.parse('2024-01-01'), Date.parse('2024-01-07')) + end + + it 'resets last usage attributes' do + expect(cursor.attributes).to eq({ + target_id: initial_redis_attributes[:target_id], + usage_window: usage_window, + last_used_by_project_id: 0, + last_usage_count: 0 + }) + end + end + + context 'when cursor does not exist in Redis' do + before do + Gitlab::Redis::SharedState.with do |redis| + redis.del(redis_key) + end + end + + it 'sets target_id and last usage attributes to zero' do + expect(cursor.attributes).to eq({ + target_id: 0, + usage_window: usage_window, + last_used_by_project_id: 0, + last_usage_count: 0 + }) + end + end + end + + describe '#interrupt!' do + it 'updates last usage attributes and sets interrupted? to true' do + expect(cursor.interrupted?).to eq(false) + + cursor.interrupt!( + last_used_by_project_id: initial_redis_attributes[:last_used_by_project_id] + 1, + last_usage_count: initial_redis_attributes[:last_usage_count] + 1 + ) + + expect(cursor.interrupted?).to eq(true) + expect(cursor.attributes).to eq({ + target_id: initial_redis_attributes[:target_id], + usage_window: usage_window, + last_used_by_project_id: initial_redis_attributes[:last_used_by_project_id] + 1, + last_usage_count: initial_redis_attributes[:last_usage_count] + 1 + }) + end + end + + describe '#target_id=(target_id)' do + context 'when new target_id is different from cursor target_id' do + it 'sets new target_id and resets last usage attributes' do + cursor.target_id = initial_redis_attributes[:target_id] + 1 + + expect(cursor.attributes).to eq({ + target_id: initial_redis_attributes[:target_id] + 1, + usage_window: usage_window, + last_used_by_project_id: 0, + last_usage_count: 0 + }) + end + end + + context 'when new target_id is the same as cursor target_id' do + it 'does not change cursor attributes' do + expect(cursor.attributes).to eq(initial_redis_attributes) + end + end + end + + describe '#advance' do + context 'when cursor target_id is less than max_target_id' do + let(:max_target_id) { initial_redis_attributes[:target_id] + 100 } + + it 'increments cursor target_id and resets last usage attributes' do + cursor.advance + + expect(cursor.attributes).to eq({ + target_id: initial_redis_attributes[:target_id] + 1, + usage_window: usage_window, + last_used_by_project_id: 0, + last_usage_count: 0 + }) + end + end + + context 'when cursor target_id is equal to or greater than max_target_id' do + it 'resets cursor target_id and last usage attributes' do + cursor.advance + + expect(cursor.attributes).to eq({ + target_id: 0, + usage_window: usage_window, + last_used_by_project_id: 0, + last_usage_count: 0 + }) + end + end + end + + describe '#max_target_id' do + let(:target_scope) { Ci::Catalog::Resource } + + before_all do + create(:ci_catalog_resource, id: 123) + create(:ci_catalog_resource, id: 100) + end + + it 'returns maximum ID of the target scope' do + expect(cursor.max_target_id).to eq(123) + end + end + + describe '#save!' do + it 'saves cursor attributes to Redis as JSON' do + cursor.target_id = 11 + cursor.interrupt!( + last_used_by_project_id: 33, + last_usage_count: 22 + ) + + cursor.save! + data = Gitlab::Redis::SharedState.with { |redis| redis.get(redis_key) } + + expect(data).to eq('{"target_id":11,"usage_window":{"start_date":"2024-01-08","end_date":"2024-01-14"},' \ + '"last_used_by_project_id":33,"last_usage_count":22}') + end + end +end