From ba1972e6b642d14e64168ea142f2bc5bbbb8fa23 Mon Sep 17 00:00:00 2001 From: lma-git Date: Tue, 23 Apr 2024 10:09:46 -0700 Subject: [PATCH 1/4] Introduce component usage aggregator and Redis cursor Introduces new component usage aggregator and cursor classes to be used later in a service scheduled by cron job. The objective is to run a regular worker that aggregates the CI component usage data in the background, avoiding long running queries. The Usages::Aggregator class iterates through the target scope in batches. For each target ID, it aggregates the component usage count in batches for the given window. It maintains a Redis cursor so that the counter can resume from it left off on each run and avoid unnecessary reprocessing. We collect the count in Rails because the SQL query `COUNT(DISTINCT(*))` is not performant when the dataset is large. This class should be run with an exclusive lease for each unique cursor Redis key. --- app/models/ci/catalog/resource.rb | 1 + .../ci/catalog/resources/components/usage.rb | 1 + lib/gitlab/ci/components/usages/aggregator.rb | 118 +++++++++ .../usages/aggregators/cursors/base.rb | 138 +++++++++++ .../aggregators/cursors/rolling_window.rb | 31 +++ .../ci/components/usages/aggregator_spec.rb | 232 ++++++++++++++++++ .../usages/aggregators/cursors/base_spec.rb | 230 +++++++++++++++++ .../cursors/rolling_window_spec.rb | 104 ++++++++ 8 files changed, 855 insertions(+) create mode 100644 lib/gitlab/ci/components/usages/aggregator.rb create mode 100644 lib/gitlab/ci/components/usages/aggregators/cursors/base.rb create mode 100644 lib/gitlab/ci/components/usages/aggregators/cursors/rolling_window.rb create mode 100644 spec/lib/gitlab/ci/components/usages/aggregator_spec.rb create mode 100644 spec/lib/gitlab/ci/components/usages/aggregators/cursors/base_spec.rb create mode 100644 spec/lib/gitlab/ci/components/usages/aggregators/cursors/rolling_window_spec.rb diff --git a/app/models/ci/catalog/resource.rb b/app/models/ci/catalog/resource.rb index b50aac09cedb38..77c5c61bb90996 100644 --- a/app/models/ci/catalog/resource.rb +++ b/app/models/ci/catalog/resource.rb @@ -11,6 +11,7 @@ class Resource < ::ApplicationRecord include PgFullTextSearchable include Gitlab::VisibilityLevel include Sortable + include EachBatch self.table_name = 'catalog_resources' diff --git a/app/models/ci/catalog/resources/components/usage.rb b/app/models/ci/catalog/resources/components/usage.rb index b721e57d0a75ea..07d785728e2b8f 100644 --- a/app/models/ci/catalog/resources/components/usage.rb +++ b/app/models/ci/catalog/resources/components/usage.rb @@ -9,6 +9,7 @@ module Components # to preserve historical usage data. class Usage < ::ApplicationRecord include PartitionedTable + include EachBatch self.table_name = 'p_catalog_resource_component_usages' self.primary_key = :id diff --git a/lib/gitlab/ci/components/usages/aggregator.rb b/lib/gitlab/ci/components/usages/aggregator.rb new file mode 100644 index 00000000000000..076407be24dc0c --- /dev/null +++ b/lib/gitlab/ci/components/usages/aggregator.rb @@ -0,0 +1,118 @@ +# frozen_string_literal: true + +module Gitlab + module Ci + module Components + module Usages + # Iterates through the target scope in batches. For each target ID, aggregates the component + # usage count in batches for the given window. Maintains a Redis cursor so that the counter + # can resume from where it left off on each run. We collect the count in Rails because the + # SQL query `COUNT(DISTINCT(*))` is not performant when the dataset is large. This service + # should be run with an exclusive lease for each unique `cursor_redis_key`. + # + # target_scope: + # - ActiveRecord relation from which to retrieve the target IDs. + # - Processed in order of ID ascending. The target model should have `include EachBatch`. + # + # group_by_column: + # - Column name in the usage table (foreign key of the target.) + # + # each_batch: + # - Yields each batch of `usage_counts` to the block. + # - `usage_counts` format: { target_object1 => 100, target_object2 => 200, ... } + # - Returns a Result containing the `cursor` object and `total_targets_completed`. + # + # rubocop: disable CodeReuse/ActiveRecord -- Custom queries required for data processing + class Aggregator + include Gitlab::Utils::StrongMemoize + + Result = Struct.new(:cursor, :total_targets_completed, keyword_init: true) + + TARGET_BATCH_SIZE = 1000 + DISTINCT_USAGE_BATCH_SIZE = 100 + MAX_RUNTIME = 4.minutes # Should be >= scheduling frequency so there is no gap between job runs + + CURSOR_TYPES = { + rolling_window: Aggregators::Cursors::RollingWindow + }.freeze + + def initialize(target_scope:, group_by_column:, cursor_type:, cursor_redis_key:) + @target_scope = target_scope + @group_by_column = group_by_column + @cursor_type = cursor_type + @cursor_redis_key = cursor_redis_key + end + + def each_batch(&usage_counts_block) + @runtime_limiter = Gitlab::Metrics::RuntimeLimiter.new(MAX_RUNTIME) + @cursor = CURSOR_TYPES[cursor_type].new(cursor_redis_key, target_scope: target_scope) + + total_targets_completed = process_targets(&usage_counts_block) + + Result.new(cursor: cursor, total_targets_completed: total_targets_completed) + end + + private + + attr_reader :target_scope, :group_by_column, :cursor_type, :cursor_redis_key, :runtime_limiter, :cursor + + def process_targets + return 0 unless cursor.data_ready? + + total_targets_completed = 0 + restored_target_scope = target_scope.where('id >= ?', cursor.target_id) + + restored_target_scope.each_batch(of: TARGET_BATCH_SIZE) do |targets_relation| + usage_counts = aggregate_usage_counts(targets_relation) + + yield usage_counts if usage_counts.present? + + total_targets_completed += usage_counts.length + break if runtime_limiter.over_time? + end + + # Only advance the cursor if we're not in the middle of a count + cursor.advance unless cursor.changed? && cursor.last_usage_count > 0 + cursor.save! + + total_targets_completed + end + + def aggregate_usage_counts(targets_relation) + usage_counts = {} + + targets_relation.order(:id).each do |target| + cursor.target_id = target.id + + usage_scope = ::Ci::Catalog::Resources::Components::Usage + .where(group_by_column => cursor.target_id) + .where(used_date: cursor.window.start_date..cursor.window.end_date) + + restored_usage_scope = usage_scope.where('used_by_project_id > ?', cursor.last_used_by_project_id) + usage_counts[target] = cursor.last_usage_count + + restored_usage_scope + .distinct_each_batch(column: :used_by_project_id, of: DISTINCT_USAGE_BATCH_SIZE) do |usages_relation| + count = usages_relation.count + usage_counts[target] += count + + if runtime_limiter.over_time? && count == DISTINCT_USAGE_BATCH_SIZE + cursor.last_used_by_project_id = usages_relation.maximum(:used_by_project_id).to_i + cursor.last_usage_count = usage_counts[target] + + usage_counts.delete(target) # Remove the incomplete count + break + end + end + + break if runtime_limiter.over_time? + end + + usage_counts + end + end + # rubocop: enable CodeReuse/ActiveRecord + end + end + end +end diff --git a/lib/gitlab/ci/components/usages/aggregators/cursors/base.rb b/lib/gitlab/ci/components/usages/aggregators/cursors/base.rb new file mode 100644 index 00000000000000..237d6f0586f243 --- /dev/null +++ b/lib/gitlab/ci/components/usages/aggregators/cursors/base.rb @@ -0,0 +1,138 @@ +# frozen_string_literal: true + +module Gitlab + module Ci + module Components + module Usages + module Aggregators + module Cursors + class Base + include Gitlab::Utils::StrongMemoize + + Window = Struct.new(:start_date, :end_date) + + CURSOR_REDIS_KEY_TTL = 7.days + + attr_reader :target_id, :window + attr_accessor :last_used_by_project_id, :last_usage_count + + def initialize(redis_key, target_scope:) + @redis_key = redis_key + @target_scope = target_scope + + fetch_attributes! + + @initial_attributes = attributes.deep_dup + end + + def target_id=(target_id) + reset_last_usage_count if target_id != self.target_id + @target_id = target_id + end + + def advance + if target_id < max_target_id + self.target_id += 1 + else + self.target_id = 0 + self.window = next_window + end + end + + def data_ready? + window.end_date < today + end + + def changed? + initial_attributes != attributes + end + + def attributes + { + target_id: target_id, + window: window, + last_used_by_project_id: last_used_by_project_id, + last_usage_count: last_usage_count + } + end + + def max_target_id + target_scope.maximum(:id).to_i + end + strong_memoize_attr :max_target_id + + def save! + Gitlab::Redis::SharedState.with do |redis| + redis.set(redis_key, attributes.to_json, ex: CURSOR_REDIS_KEY_TTL) + end + end + + private + + attr_reader :redis_key, :target_scope, :initial_attributes + + def window=(window) + reset_last_usage_count if window != self.window + @window = window + end + + def reset_last_usage_count + self.last_used_by_project_id = 0 + self.last_usage_count = 0 + end + + def fetch_attributes! + data = Gitlab::Redis::SharedState.with do |redis| + raw = redis.get(redis_key) + raw.present? ? Gitlab::Json.parse(raw) : {} + end.with_indifferent_access + + start_date = parse_date(data.dig(:window, :start_date)) + end_date = parse_date(data.dig(:window, :end_date)) + + @target_id = data[:target_id] || default_target_id + @window = start_date && end_date ? Window.new(start_date, end_date) : default_window + @last_used_by_project_id = data[:last_used_by_project_id].to_i + @last_usage_count = data[:last_usage_count].to_i + + self.window = default_window if use_default_window_when_lagging? && window.end_date < yesterday + end + + # This time zone should be consistent with + # `Ci::Catalog::Resources::Components::Usage#set_used_date` + def today + Date.today + end + strong_memoize_attr :today + + def yesterday + Date.today - 1.day + end + strong_memoize_attr :yesterday + + def parse_date(date_str) + Date.parse(date_str) if date_str + end + + def default_target_id + 0 + end + + def use_default_window_when_lagging? + false + end + + def default_window + raise NotImplementedError + end + + def next_window + raise NotImplementedError + end + end + end + end + end + end + end +end diff --git a/lib/gitlab/ci/components/usages/aggregators/cursors/rolling_window.rb b/lib/gitlab/ci/components/usages/aggregators/cursors/rolling_window.rb new file mode 100644 index 00000000000000..74d0d722599176 --- /dev/null +++ b/lib/gitlab/ci/components/usages/aggregators/cursors/rolling_window.rb @@ -0,0 +1,31 @@ +# frozen_string_literal: true + +module Gitlab + module Ci + module Components + module Usages + module Aggregators + module Cursors + class RollingWindow < Base + WINDOW_LENGTH = 30.days + + private + + def use_default_window_when_lagging? + true + end + + def default_window + Window.new(today - WINDOW_LENGTH, today - 1.day) + end + + def next_window + Window.new(window.start_date + 1.day, window.end_date + 1.day) + end + end + end + end + end + end + end +end diff --git a/spec/lib/gitlab/ci/components/usages/aggregator_spec.rb b/spec/lib/gitlab/ci/components/usages/aggregator_spec.rb new file mode 100644 index 00000000000000..2a7e9649cfa134 --- /dev/null +++ b/spec/lib/gitlab/ci/components/usages/aggregator_spec.rb @@ -0,0 +1,232 @@ +# frozen_string_literal: true + +require 'spec_helper' + +RSpec.describe Gitlab::Ci::Components::Usages::Aggregator, :clean_gitlab_redis_shared_state, :freeze_time, + feature_category: :pipeline_composition do + let_it_be(:today) { Date.today } + let_it_be(:yesterday) { today - 1.day } + + let(:usage_model) { Ci::Catalog::Resources::Components::Usage } + let(:target_scope) { Ci::Catalog::Resource } + let(:group_by_column) { :catalog_resource_id } + let(:cursor_redis_key) { 'my_redis_key:cursor' } + let(:advanced_cursor_attributes) do + { + target_id: 0, + window: next_window, + last_used_by_project_id: 0, + last_usage_count: 0 + } + end + + subject(:aggregator) do + described_class.new( + target_scope: target_scope, + group_by_column: group_by_column, + cursor_type: cursor_type, + cursor_redis_key: cursor_redis_key + ) + end + + describe 'with cursor_type: :rolling_window' do + let(:cursor_type) { :rolling_window } + + let_it_be(:thirty_days_ago) { today - 30.days } + let_it_be(:cursor_class) { Gitlab::Ci::Components::Usages::Aggregators::Cursors::RollingWindow } + + let_it_be(:initial_redis_window) { cursor_class::Window.new(thirty_days_ago - 1.day, yesterday - 1.day) } + let_it_be(:default_window) { cursor_class::Window.new(thirty_days_ago, yesterday) } + let_it_be(:next_window) { cursor_class::Window.new(thirty_days_ago + 1.day, today) } + + before_all do + # First catalog resource: 3 components and 3 usages per component within the default window + version = create(:ci_catalog_resource_version) + create_list(:ci_catalog_resource_component, 3, version: version).each do |component| + (1..3).each do |k| + create( + :ci_catalog_resource_component_usage, + component: component, + used_date: default_window.end_date, + used_by_project_id: k + ) + end + end + + # Create 4 more catalog resources, each with 1-4 components and 0-6 usages per component across different windows + create_list(:ci_catalog_resource_version, 4).each_with_index do |version, i| + create_list(:ci_catalog_resource_component, i + 1, version: version).each_with_index do |component, j| + next unless j > 0 + + (1..j * 2).each do |k| + create( + :ci_catalog_resource_component_usage, + component: component, + used_date: initial_redis_window.end_date - 3.days + k.days, + used_by_project_id: k + ) + end + end + end + end + + shared_examples 'when the runtime limit is not reached' do + it 'returns the expected result for each run' do + # On 1st run, we process all catalog resources and advance the cursor + batched_usage_counts, result = run_aggregator_each_batch + + expect(batched_usage_counts).to eq(expected_batched_usage_counts) + expect(result.total_targets_completed).to eq(target_scope.count) + expect(result.cursor.attributes).to eq(advanced_cursor_attributes) + + # On 2nd run, it's not tomorrow yet, so the cursor remains unchanged + batched_usage_counts, result = run_aggregator_each_batch + + expect(batched_usage_counts).to eq([]) + expect(result.total_targets_completed).to eq(0) + expect(result.cursor.attributes).to eq(advanced_cursor_attributes) + end + end + + shared_examples 'with multiple distinct usage batches' do + before do + stub_const("#{described_class}::DISTINCT_USAGE_BATCH_SIZE", 2) + end + + it_behaves_like 'when the runtime limit is not reached' + + context 'when the runtime limit is reached' do + before do + # Sets the aggregator to break after the first iteration on each run + stub_const("#{described_class}::MAX_RUNTIME", 0) + end + + it 'returns the expected result for each run' do + # On 1st run, we get an incomplete usage count for the first catalog resource + batched_usage_counts, result = run_aggregator_each_batch + + expect(batched_usage_counts).to eq([]) + expect(result.total_targets_completed).to eq(0) + expect(result.cursor.attributes).to eq({ + target_id: target_scope.first.id, + window: default_window, + last_used_by_project_id: 2, + last_usage_count: 2 + }) + + # On 2nd run, we get the complete usage count for the first catalog resource and advance the cursor + batched_usage_counts, result = run_aggregator_each_batch + + expect(batched_usage_counts).to eq([{ target_scope.first => 3 }]) + expect(result.total_targets_completed).to eq(1) + expect(result.cursor.attributes).to eq({ + target_id: target_scope.first.id + 1, + window: default_window, + last_used_by_project_id: 0, + last_usage_count: 0 + }) + + all_batched_usage_counts = batched_usage_counts + repeat_aggregator_each_batch_until_done + batched_usage_counts_merged = all_batched_usage_counts.flatten.reduce(&:merge) + + expect(batched_usage_counts_merged.length).to eq(5) + expect(batched_usage_counts_merged).to eq(expected_batched_usage_counts_merged) + end + + context 'when a target is deleted between runs' do + it 'returns the expected result for each run' do + # On 1st run, we get an incomplete usage count for the first catalog resource + batched_usage_counts, result = run_aggregator_each_batch + + expect(batched_usage_counts).to eq([]) + expect(result.total_targets_completed).to eq(0) + expect(result.cursor.attributes).to eq({ + target_id: target_scope.first.id, + window: default_window, + last_used_by_project_id: 2, + last_usage_count: 2 + }) + + target_scope.first.delete + + all_batched_usage_counts = repeat_aggregator_each_batch_until_done + batched_usage_counts_merged = all_batched_usage_counts.reduce(&:merge) + + expect(batched_usage_counts_merged.length).to eq(4) + expect(batched_usage_counts_merged).to eq(expected_batched_usage_counts_merged) + end + end + + context 'when there are no usage records' do + it 'returns the expected result' do + usage_model.delete_all + + all_batched_usage_counts = repeat_aggregator_each_batch_until_done + batched_usage_counts_merged = all_batched_usage_counts.reduce(&:merge) + + expect(batched_usage_counts_merged.length).to eq(5) + expect(batched_usage_counts_merged).to eq(expected_batched_usage_counts_merged) + end + end + end + end + + it_behaves_like 'when the runtime limit is not reached' + it_behaves_like 'with multiple distinct usage batches' + + context 'with multiple target batches' do + before do + stub_const("#{described_class}::TARGET_BATCH_SIZE", 3) + end + + it_behaves_like 'when the runtime limit is not reached' + it_behaves_like 'with multiple distinct usage batches' + end + end + + private + + def run_aggregator_each_batch + batched_usage_counts = [] + + result = aggregator.each_batch do |usage_counts| + batched_usage_counts << usage_counts + end + + [batched_usage_counts, result] + end + + def expected_batched_usage_counts + batched_usage_counts = [] + + target_scope.each_batch(of: described_class::TARGET_BATCH_SIZE) do |targets| + usage_counts = usage_model + .includes(:catalog_resource) + .select('catalog_resource_id, COUNT(DISTINCT used_by_project_id) AS usage_count') + .where(used_date: default_window.start_date..default_window.end_date) + .where(group_by_column => targets) + .group(:catalog_resource_id) + .each_with_object({}) { |r, hash| hash[r.catalog_resource] = r.usage_count } + + batched_usage_counts << targets.index_with { 0 }.merge(usage_counts) + end + + batched_usage_counts + end + + def expected_batched_usage_counts_merged + expected_batched_usage_counts.reduce(&:merge) + end + + def repeat_aggregator_each_batch_until_done + all_batched_usage_counts = [] + + 30.times do + batched_usage_counts, result = run_aggregator_each_batch + all_batched_usage_counts << batched_usage_counts + break if result.cursor.window == next_window + end + + all_batched_usage_counts.flatten + end +end diff --git a/spec/lib/gitlab/ci/components/usages/aggregators/cursors/base_spec.rb b/spec/lib/gitlab/ci/components/usages/aggregators/cursors/base_spec.rb new file mode 100644 index 00000000000000..0bc184fd486cee --- /dev/null +++ b/spec/lib/gitlab/ci/components/usages/aggregators/cursors/base_spec.rb @@ -0,0 +1,230 @@ +# frozen_string_literal: true + +require 'spec_helper' + +RSpec.describe Gitlab::Ci::Components::Usages::Aggregators::Cursors::Base, :clean_gitlab_redis_shared_state, + feature_category: :pipeline_composition do + let(:initial_redis_window) { described_class::Window.new(Date.parse('2024-01-01'), Date.parse('2024-01-07')) } + let(:default_window) { described_class::Window.new(Date.parse('2024-01-08'), Date.parse('2024-01-14')) } + let(:next_window) { described_class::Window.new(Date.parse('2024-01-15'), Date.parse('2024-01-21')) } + + let(:cursor_class) do + Class.new(described_class) do + def default_window + self.class::Window.new(Date.parse('2024-01-08'), Date.parse('2024-01-14')) # default_window + end + + def next_window + self.class::Window.new(Date.parse('2024-01-15'), Date.parse('2024-01-21')) # next_window + end + end + end + + let(:redis_key) { 'my_redis_key:cursor' } + let(:target_scope) { class_double(Ci::Catalog::Resource, maximum: max_target_id) } + let(:max_target_id) { initial_redis_attributes[:target_id] } + + let(:initial_redis_attributes) do + { + target_id: 1, + window: initial_redis_window, + last_used_by_project_id: 100, + last_usage_count: 10 + } + end + + subject(:cursor) { cursor_class.new(redis_key, target_scope: target_scope) } + + before do + Gitlab::Redis::SharedState.with do |redis| + redis.set(redis_key, initial_redis_attributes.to_json) + end + end + + describe '.new' do + it 'fetches and parses the attributes from Redis' do + expect(cursor.attributes).to eq(initial_redis_attributes) + end + + context 'when cursor does not exist in Redis' do + before do + Gitlab::Redis::SharedState.with do |redis| + redis.del(redis_key) + end + end + + it 'sets cursor attributes to their default values' do + expect(cursor.attributes).to eq({ + target_id: 0, + window: default_window, + last_used_by_project_id: 0, + last_usage_count: 0 + }) + end + + context 'when `default_window` is not implemented' do + let(:cursor_class) do + Class.new(described_class) do + def next_window + self.class::Window.new(Date.parse('2024-01-15'), Date.parse('2024-01-21')) + end + end + end + + it 'raises NotImplementedError' do + expect { cursor }.to raise_error(NotImplementedError) + end + end + end + end + + describe '#target_id=(target_id)' do + context 'when new target_id is different from cursor target_id' do + it 'sets new target_id and resets last usage count' do + cursor.target_id = initial_redis_attributes[:target_id] + 1 + + expect(cursor.attributes).to eq({ + target_id: initial_redis_attributes[:target_id] + 1, + window: initial_redis_attributes[:window], + last_used_by_project_id: 0, + last_usage_count: 0 + }) + end + end + + context 'when new target_id is the same as cursor target_id' do + it 'does not change cursor attributes' do + expect(cursor.attributes).to eq(initial_redis_attributes) + end + end + end + + describe '#advance' do + context 'when cursor target_id is less than max_target_id' do + let(:max_target_id) { initial_redis_attributes[:target_id] + 100 } + + it 'increments cursor target_id and resets last usage count' do + cursor.advance + + expect(cursor.attributes).to eq({ + target_id: initial_redis_attributes[:target_id] + 1, + window: initial_redis_attributes[:window], + last_used_by_project_id: 0, + last_usage_count: 0 + }) + end + end + + context 'when cursor target_id is equal to or greater than max_target_id' do + it 'resets cursor target_id, resets last usage count, and sets window to next window' do + cursor.advance + + expect(cursor.attributes).to eq({ + target_id: 0, + window: next_window, + last_used_by_project_id: 0, + last_usage_count: 0 + }) + end + + context 'when `next_window` is not implemented' do + let(:cursor_class) do + Class.new(described_class) do + def default_window + self.class::Window.new(Date.parse('2024-01-08'), Date.parse('2024-01-14')) + end + end + end + + it 'raises NotImplementedError' do + expect { cursor.advance }.to raise_error(NotImplementedError) + end + end + end + end + + describe '#data_ready?', :freeze_time do + subject { cursor.data_ready? } + + context 'when cursor window end_date is yesterday' do + it 'returns true' do + travel_to((initial_redis_window.end_date + 1.day).to_time) + + is_expected.to eq(true) + end + end + + context 'when cursor window end_date is today' do + it 'returns false' do + travel_to(initial_redis_window.end_date.to_time) + + is_expected.to eq(false) + end + end + + context 'when cursor window end_date is tomorrow' do + it 'returns false' do + travel_to((initial_redis_window.end_date - 1.day).to_time) + + is_expected.to eq(false) + end + end + end + + describe '#changed?' do + subject { cursor.changed? } + + it { is_expected.to eq(false) } + + context 'when cursor target_id is updated' do + it 'returns true' do + cursor.target_id += 1 + + is_expected.to eq(true) + end + end + + context 'when cursor last_used_by_project_id is updated' do + it 'returns true' do + cursor.last_used_by_project_id += 1 + + is_expected.to eq(true) + end + end + + context 'when cursor last_usage_count is updated' do + it 'returns true' do + cursor.last_usage_count += 1 + + is_expected.to eq(true) + end + end + end + + describe '#max_target_id' do + let(:target_scope) { Ci::Catalog::Resource } + + before_all do + create(:ci_catalog_resource, id: 123) + create(:ci_catalog_resource, id: 100) + end + + it 'returns maximum ID of the target scope' do + expect(cursor.max_target_id).to eq(123) + end + end + + describe '#save!' do + it 'saves cursor attributes to Redis as JSON' do + cursor.target_id = 11 + cursor.last_used_by_project_id = 33 + cursor.last_usage_count = 22 + cursor.save! + + data = Gitlab::Redis::SharedState.with { |redis| redis.get(redis_key) } + + expect(data).to eq('{"target_id":11,"window":{"start_date":"2024-01-01","end_date":"2024-01-07"},' \ + '"last_used_by_project_id":33,"last_usage_count":22}') + end + end +end diff --git a/spec/lib/gitlab/ci/components/usages/aggregators/cursors/rolling_window_spec.rb b/spec/lib/gitlab/ci/components/usages/aggregators/cursors/rolling_window_spec.rb new file mode 100644 index 00000000000000..845c0d65263392 --- /dev/null +++ b/spec/lib/gitlab/ci/components/usages/aggregators/cursors/rolling_window_spec.rb @@ -0,0 +1,104 @@ +# frozen_string_literal: true + +require 'spec_helper' + +RSpec.describe Gitlab::Ci::Components::Usages::Aggregators::Cursors::RollingWindow, :clean_gitlab_redis_shared_state, + :freeze_time, feature_category: :pipeline_composition do + let(:today) { Date.today } + let(:yesterday) { today - 1.day } + let(:thirty_days_ago) { today - 30.days } + + let(:initial_redis_window) { described_class::Window.new(thirty_days_ago - 1.day, yesterday - 1.day) } + let(:default_window) { described_class::Window.new(thirty_days_ago, yesterday) } + let(:next_window) { described_class::Window.new(thirty_days_ago + 1.day, today) } + + let(:redis_key) { 'my_redis_key:cursor' } + let(:target_scope) { class_double(Ci::Catalog::Resource, maximum: max_target_id) } + let(:max_target_id) { initial_redis_attributes[:target_id] } + + let(:initial_redis_attributes) do + { + target_id: 1, + window: initial_redis_window, + last_used_by_project_id: 100, + last_usage_count: 10 + } + end + + subject(:cursor) { described_class.new(redis_key, target_scope: target_scope) } + + before do + Gitlab::Redis::SharedState.with do |redis| + redis.set(redis_key, initial_redis_attributes.to_json) + end + end + + describe '.new' do + context 'when cursor window end_date is older than yesterday' do + it 'sets cursor window to default and resets last usage count' do + expect(cursor.attributes).to eq({ + target_id: initial_redis_attributes[:target_id], + window: default_window, + last_used_by_project_id: 0, + last_usage_count: 0 + }) + end + end + + context 'when cursor window end_date is yesterday' do + it 'does not change cursor attributes' do + travel_to((initial_redis_window.end_date + 1.day).to_time) + + expect(cursor.attributes).to eq(initial_redis_attributes) + end + end + + context 'when cursor window end_date is today' do + it 'does not change cursor attributes' do + travel_to(initial_redis_window.end_date.to_time) + + expect(cursor.attributes).to eq(initial_redis_attributes) + end + end + + context 'when cursor window end_date is tomorrow' do + it 'does not change cursor attributes' do + travel_to((initial_redis_window.end_date - 1.day).to_time) + + expect(cursor.attributes).to eq(initial_redis_attributes) + end + end + + context 'when cursor does not exist in Redis' do + before do + Gitlab::Redis::SharedState.with do |redis| + redis.del(redis_key) + end + end + + it 'sets cursor attributes to their default values' do + expect(cursor.attributes).to eq({ + target_id: 0, + window: default_window, + last_used_by_project_id: 0, + last_usage_count: 0 + }) + end + end + end + + describe '#advance' do + context 'when cursor target_id is equal to or greater than max_target_id' do + it 'resets cursor target_id, resets last usage count, and sets window to next window' do + cursor.advance + + expect(cursor.attributes).to eq({ + target_id: 0, + window: next_window, + last_used_by_project_id: 0, + last_usage_count: 0 + }) + end + end + end +end -- GitLab From da088a702209844debad21db62104c31c5ecd9f5 Mon Sep 17 00:00:00 2001 From: lma-git Date: Mon, 6 May 2024 12:03:21 -0700 Subject: [PATCH 2/4] Minor refactoring and added comments Minor refactoring and added comments --- lib/gitlab/ci/components/usages/aggregator.rb | 42 ++++++++++++------- .../usages/aggregators/cursors/base.rb | 30 ++++++------- .../ci/components/usages/aggregator_spec.rb | 30 +++++++------ 3 files changed, 56 insertions(+), 46 deletions(-) diff --git a/lib/gitlab/ci/components/usages/aggregator.rb b/lib/gitlab/ci/components/usages/aggregator.rb index 076407be24dc0c..50c1134c15e7e2 100644 --- a/lib/gitlab/ci/components/usages/aggregator.rb +++ b/lib/gitlab/ci/components/usages/aggregator.rb @@ -4,18 +4,25 @@ module Gitlab module Ci module Components module Usages - # Iterates through the target scope in batches. For each target ID, aggregates the component - # usage count in batches for the given window. Maintains a Redis cursor so that the counter - # can resume from where it left off on each run. We collect the count in Rails because the - # SQL query `COUNT(DISTINCT(*))` is not performant when the dataset is large. This service - # should be run with an exclusive lease for each unique `cursor_redis_key`. + # Iterates through the target scope in batches. For each target ID, aggregates the component usage count + # (# of unique `used_by_project_id`s) in batches for the given window. Maintains a Redis cursor so that + # the counter can resume from where it left off on each run. We collect the count in Rails because the + # SQL query `COUNT(DISTINCT(*))` is not performant when the dataset is large. This service should be run + # with an exclusive lease for each unique `cursor_redis_key`. # # target_scope: # - ActiveRecord relation from which to retrieve the target IDs. # - Processed in order of ID ascending. The target model should have `include EachBatch`. - # # group_by_column: - # - Column name in the usage table (foreign key of the target.) + # - Column name in the usage table to aggregate the data with. It should be the foreign + # key of the target_scope. e.g. If target_scope = Ci::Catalog::Resource, then + # group_by_column = :catalog_resource_id. + # cursor_type: + # - Type of cursor to use. e.g. `:rolling_window` creates a cursor that always sets + # the usage data window to the last 30 days. + # cursor_redis_key: + # - Redis key to save the cursor attributes. It should be unique for each + # target_scope/group_by_column/cursor_type combination. # # each_batch: # - Yields each batch of `usage_counts` to the block. @@ -30,7 +37,7 @@ class Aggregator TARGET_BATCH_SIZE = 1000 DISTINCT_USAGE_BATCH_SIZE = 100 - MAX_RUNTIME = 4.minutes # Should be >= scheduling frequency so there is no gap between job runs + MAX_RUNTIME = 4.minutes # Should be >= job scheduling frequency so there is no gap between job runs CURSOR_TYPES = { rolling_window: Aggregators::Cursors::RollingWindow @@ -39,14 +46,11 @@ class Aggregator def initialize(target_scope:, group_by_column:, cursor_type:, cursor_redis_key:) @target_scope = target_scope @group_by_column = group_by_column - @cursor_type = cursor_type - @cursor_redis_key = cursor_redis_key + @cursor = CURSOR_TYPES[cursor_type].new(cursor_redis_key, target_scope: target_scope) + @runtime_limiter = Gitlab::Metrics::RuntimeLimiter.new(MAX_RUNTIME) end def each_batch(&usage_counts_block) - @runtime_limiter = Gitlab::Metrics::RuntimeLimiter.new(MAX_RUNTIME) - @cursor = CURSOR_TYPES[cursor_type].new(cursor_redis_key, target_scope: target_scope) - total_targets_completed = process_targets(&usage_counts_block) Result.new(cursor: cursor, total_targets_completed: total_targets_completed) @@ -54,12 +58,13 @@ def each_batch(&usage_counts_block) private - attr_reader :target_scope, :group_by_column, :cursor_type, :cursor_redis_key, :runtime_limiter, :cursor + attr_reader :target_scope, :group_by_column, :runtime_limiter, :cursor def process_targets - return 0 unless cursor.data_ready? - total_targets_completed = 0 + return total_targets_completed unless cursor.data_ready? + + # Restore the scope from cursor so we can resume from the last run restored_target_scope = target_scope.where('id >= ?', cursor.target_id) restored_target_scope.each_batch(of: TARGET_BATCH_SIZE) do |targets_relation| @@ -82,12 +87,15 @@ def aggregate_usage_counts(targets_relation) usage_counts = {} targets_relation.order(:id).each do |target| + # When target.id is different from the cursor's target_id, it + # resets last_usage_count and last_used_by_project_id to 0 cursor.target_id = target.id usage_scope = ::Ci::Catalog::Resources::Components::Usage .where(group_by_column => cursor.target_id) .where(used_date: cursor.window.start_date..cursor.window.end_date) + # Restore the scope from cursor so we can resume from the last run restored_usage_scope = usage_scope.where('used_by_project_id > ?', cursor.last_used_by_project_id) usage_counts[target] = cursor.last_usage_count @@ -96,6 +104,8 @@ def aggregate_usage_counts(targets_relation) count = usages_relation.count usage_counts[target] += count + # If we have less than the batch size of usage records, we're done for + # the current scope and we don't need to save the last usage count. if runtime_limiter.over_time? && count == DISTINCT_USAGE_BATCH_SIZE cursor.last_used_by_project_id = usages_relation.maximum(:used_by_project_id).to_i cursor.last_usage_count = usage_counts[target] diff --git a/lib/gitlab/ci/components/usages/aggregators/cursors/base.rb b/lib/gitlab/ci/components/usages/aggregators/cursors/base.rb index 237d6f0586f243..e75e9e8ad3ad8c 100644 --- a/lib/gitlab/ci/components/usages/aggregators/cursors/base.rb +++ b/lib/gitlab/ci/components/usages/aggregators/cursors/base.rb @@ -6,6 +6,8 @@ module Components module Usages module Aggregators module Cursors + # This class represents a Redis cursor that keeps track of the processing + # position and data window in Gitlab::Ci::Components::Usages::Aggregator. class Base include Gitlab::Utils::StrongMemoize @@ -20,9 +22,7 @@ def initialize(redis_key, target_scope:) @redis_key = redis_key @target_scope = target_scope - fetch_attributes! - - @initial_attributes = attributes.deep_dup + fetch_initial_attributes! end def target_id=(target_id) @@ -71,17 +71,7 @@ def save! attr_reader :redis_key, :target_scope, :initial_attributes - def window=(window) - reset_last_usage_count if window != self.window - @window = window - end - - def reset_last_usage_count - self.last_used_by_project_id = 0 - self.last_usage_count = 0 - end - - def fetch_attributes! + def fetch_initial_attributes! data = Gitlab::Redis::SharedState.with do |redis| raw = redis.get(redis_key) raw.present? ? Gitlab::Json.parse(raw) : {} @@ -96,6 +86,18 @@ def fetch_attributes! @last_usage_count = data[:last_usage_count].to_i self.window = default_window if use_default_window_when_lagging? && window.end_date < yesterday + + @initial_attributes = attributes.deep_dup + end + + def window=(window) + reset_last_usage_count if window != self.window + @window = window + end + + def reset_last_usage_count + self.last_used_by_project_id = 0 + self.last_usage_count = 0 end # This time zone should be consistent with diff --git a/spec/lib/gitlab/ci/components/usages/aggregator_spec.rb b/spec/lib/gitlab/ci/components/usages/aggregator_spec.rb index 2a7e9649cfa134..26fde90543dc08 100644 --- a/spec/lib/gitlab/ci/components/usages/aggregator_spec.rb +++ b/spec/lib/gitlab/ci/components/usages/aggregator_spec.rb @@ -20,15 +20,6 @@ } end - subject(:aggregator) do - described_class.new( - target_scope: target_scope, - group_by_column: group_by_column, - cursor_type: cursor_type, - cursor_redis_key: cursor_redis_key - ) - end - describe 'with cursor_type: :rolling_window' do let(:cursor_type) { :rolling_window } @@ -73,14 +64,14 @@ shared_examples 'when the runtime limit is not reached' do it 'returns the expected result for each run' do # On 1st run, we process all catalog resources and advance the cursor - batched_usage_counts, result = run_aggregator_each_batch + batched_usage_counts, result = run_new_aggregator_each_batch expect(batched_usage_counts).to eq(expected_batched_usage_counts) expect(result.total_targets_completed).to eq(target_scope.count) expect(result.cursor.attributes).to eq(advanced_cursor_attributes) # On 2nd run, it's not tomorrow yet, so the cursor remains unchanged - batched_usage_counts, result = run_aggregator_each_batch + batched_usage_counts, result = run_new_aggregator_each_batch expect(batched_usage_counts).to eq([]) expect(result.total_targets_completed).to eq(0) @@ -103,7 +94,7 @@ it 'returns the expected result for each run' do # On 1st run, we get an incomplete usage count for the first catalog resource - batched_usage_counts, result = run_aggregator_each_batch + batched_usage_counts, result = run_new_aggregator_each_batch expect(batched_usage_counts).to eq([]) expect(result.total_targets_completed).to eq(0) @@ -115,7 +106,7 @@ }) # On 2nd run, we get the complete usage count for the first catalog resource and advance the cursor - batched_usage_counts, result = run_aggregator_each_batch + batched_usage_counts, result = run_new_aggregator_each_batch expect(batched_usage_counts).to eq([{ target_scope.first => 3 }]) expect(result.total_targets_completed).to eq(1) @@ -136,7 +127,7 @@ context 'when a target is deleted between runs' do it 'returns the expected result for each run' do # On 1st run, we get an incomplete usage count for the first catalog resource - batched_usage_counts, result = run_aggregator_each_batch + batched_usage_counts, result = run_new_aggregator_each_batch expect(batched_usage_counts).to eq([]) expect(result.total_targets_completed).to eq(0) @@ -186,7 +177,14 @@ private - def run_aggregator_each_batch + def run_new_aggregator_each_batch + aggregator = described_class.new( + target_scope: target_scope, + group_by_column: group_by_column, + cursor_type: cursor_type, + cursor_redis_key: cursor_redis_key + ) + batched_usage_counts = [] result = aggregator.each_batch do |usage_counts| @@ -222,7 +220,7 @@ def repeat_aggregator_each_batch_until_done all_batched_usage_counts = [] 30.times do - batched_usage_counts, result = run_aggregator_each_batch + batched_usage_counts, result = run_new_aggregator_each_batch all_batched_usage_counts << batched_usage_counts break if result.cursor.window == next_window end -- GitLab From 562fd32ed36fe2d34f8100474e037e5b9ef4485e Mon Sep 17 00:00:00 2001 From: lma-git Date: Tue, 7 May 2024 15:32:01 -0700 Subject: [PATCH 3/4] Add more clarifying comments and add lease guard Add more clarifying comments and add lease guard. --- lib/gitlab/ci/components/usages/aggregator.rb | 95 +++++++++++++------ .../usages/aggregators/cursors/base.rb | 10 +- .../ci/components/usages/aggregator_spec.rb | 12 ++- .../usages/aggregators/cursors/base_spec.rb | 18 ++-- 4 files changed, 92 insertions(+), 43 deletions(-) diff --git a/lib/gitlab/ci/components/usages/aggregator.rb b/lib/gitlab/ci/components/usages/aggregator.rb index 50c1134c15e7e2..92e2ff03a0bdf5 100644 --- a/lib/gitlab/ci/components/usages/aggregator.rb +++ b/lib/gitlab/ci/components/usages/aggregator.rb @@ -4,61 +4,95 @@ module Gitlab module Ci module Components module Usages - # Iterates through the target scope in batches. For each target ID, aggregates the component usage count - # (# of unique `used_by_project_id`s) in batches for the given window. Maintains a Redis cursor so that - # the counter can resume from where it left off on each run. We collect the count in Rails because the - # SQL query `COUNT(DISTINCT(*))` is not performant when the dataset is large. This service should be run - # with an exclusive lease for each unique `cursor_redis_key`. + # Component usage is defined as the number of unique `used_by_project_id`s in the table + # `p_catalog_resource_component_usages` for a given scope. + # + # This aggregator iterates through the target scope in batches. For each target ID, it + # collects the usage count using `distinct_each_batch` for the given window. It utilizes + # a Redis cursor so that the counter can resume from where it left off on each run or + # advance to the next target/window. We collect the count in Rails because the SQL query + # `COUNT(DISTINCT(*))` is not performant when the dataset is large. + # + ##### Usage + # + # each_batch: + # - Yields each batch of `usage_counts` to the given block. + # - `usage_counts` format: { target_object1 => 100, target_object2 => 200, ... } + # - If the lease is obtained, returns a Result containing the `cursor` object and + # `total_targets_completed`. Otherwise, returns nil. + # + # Example: + # aggregator = Gitlab::Ci::Components::Usages::Aggregator.new( + # target_scope: Ci::Catalog::Resource, + # group_by_column: :catalog_resource_id, + # cursor_type: :rolling_window, + # lease_key: 'my_aggregator_service_lease_key' + # ) + # + # result = aggregator.each_batch do |usage_counts| + # # Bulk update usage counts in the database + # end + # + ##### Runtime, Exclusive Lease, and Worker + # + # The batch processing is limited to MAX_RUNTIME, but the actual total runtime will be + # slightly longer as it depends on the execution time of `&usage_counts_block`. + # This aggregator is protected from parallel processing with an exclusive lease guard. + # The worker running this service should be scheduled at the same cadence as MAX_RUNTIME + # so that the next job can be immediately queued after the current one completes. + # It should also have: + # deduplicate :until_executed, if_deduplicated: :reschedule_once, ttl: LEASE_TIMEOUT + # + ##### Parameters # # target_scope: - # - ActiveRecord relation from which to retrieve the target IDs. - # - Processed in order of ID ascending. The target model should have `include EachBatch`. + # - ActiveRecord relation to retrieve the target IDs. Processed in order of ID ascending. + # - The target model class should have `include EachBatch`. # group_by_column: - # - Column name in the usage table to aggregate the data with. It should be the foreign - # key of the target_scope. e.g. If target_scope = Ci::Catalog::Resource, then - # group_by_column = :catalog_resource_id. + # - This should be the usage table's foreign key of the target_scope. + # e.g. target_scope = Ci::Catalog::Resource, group_by_column = :catalog_resource_id # cursor_type: # - Type of cursor to use. e.g. `:rolling_window` creates a cursor that always sets # the usage data window to the last 30 days. - # cursor_redis_key: - # - Redis key to save the cursor attributes. It should be unique for each - # target_scope/group_by_column/cursor_type combination. - # - # each_batch: - # - Yields each batch of `usage_counts` to the block. - # - `usage_counts` format: { target_object1 => 100, target_object2 => 200, ... } - # - Returns a Result containing the `cursor` object and `total_targets_completed`. + # lease_key: + # - Used for obtaining an exclusive lease. Also used as part of the cursor Redis key to + # save the cursor attributes. # # rubocop: disable CodeReuse/ActiveRecord -- Custom queries required for data processing class Aggregator include Gitlab::Utils::StrongMemoize + include ExclusiveLeaseGuard Result = Struct.new(:cursor, :total_targets_completed, keyword_init: true) TARGET_BATCH_SIZE = 1000 DISTINCT_USAGE_BATCH_SIZE = 100 MAX_RUNTIME = 4.minutes # Should be >= job scheduling frequency so there is no gap between job runs + LEASE_TIMEOUT = 5.minutes # Should be MAX_RUNTIME + extra time to execute `&usage_counts_block` CURSOR_TYPES = { rolling_window: Aggregators::Cursors::RollingWindow }.freeze - def initialize(target_scope:, group_by_column:, cursor_type:, cursor_redis_key:) + def initialize(target_scope:, group_by_column:, cursor_type:, lease_key:) @target_scope = target_scope @group_by_column = group_by_column - @cursor = CURSOR_TYPES[cursor_type].new(cursor_redis_key, target_scope: target_scope) + @lease_key = lease_key # Used by ExclusiveLeaseGuard + @cursor = CURSOR_TYPES[cursor_type].new("#{lease_key}:cursor", target_scope: target_scope) @runtime_limiter = Gitlab::Metrics::RuntimeLimiter.new(MAX_RUNTIME) end def each_batch(&usage_counts_block) - total_targets_completed = process_targets(&usage_counts_block) + try_obtain_lease do + total_targets_completed = process_targets(&usage_counts_block) - Result.new(cursor: cursor, total_targets_completed: total_targets_completed) + Result.new(cursor: cursor, total_targets_completed: total_targets_completed) + end end private - attr_reader :target_scope, :group_by_column, :runtime_limiter, :cursor + attr_reader :target_scope, :group_by_column, :cursor, :runtime_limiter def process_targets total_targets_completed = 0 @@ -76,8 +110,8 @@ def process_targets break if runtime_limiter.over_time? end - # Only advance the cursor if we're not in the middle of a count - cursor.advance unless cursor.changed? && cursor.last_usage_count > 0 + # Only advance the cursor if we're not in the middle of aggregating the usage count for a target + cursor.advance if cursor.unchanged? || cursor.last_usage_count == 0 cursor.save! total_targets_completed @@ -88,7 +122,7 @@ def aggregate_usage_counts(targets_relation) targets_relation.order(:id).each do |target| # When target.id is different from the cursor's target_id, it - # resets last_usage_count and last_used_by_project_id to 0 + # resets last_usage_count and last_used_by_project_id to 0. cursor.target_id = target.id usage_scope = ::Ci::Catalog::Resources::Components::Usage @@ -104,8 +138,9 @@ def aggregate_usage_counts(targets_relation) count = usages_relation.count usage_counts[target] += count - # If we have less than the batch size of usage records, we're done for - # the current scope and we don't need to save the last usage count. + # If we're over time and count == batch size, it means there is likely another batch + # to process for the current target, so the usage count is incomplete. We store the + # last used_by_project_id and count so that we can resume counting on the next run. if runtime_limiter.over_time? && count == DISTINCT_USAGE_BATCH_SIZE cursor.last_used_by_project_id = usages_relation.maximum(:used_by_project_id).to_i cursor.last_usage_count = usage_counts[target] @@ -120,6 +155,10 @@ def aggregate_usage_counts(targets_relation) usage_counts end + + def lease_timeout + LEASE_TIMEOUT + end end # rubocop: enable CodeReuse/ActiveRecord end diff --git a/lib/gitlab/ci/components/usages/aggregators/cursors/base.rb b/lib/gitlab/ci/components/usages/aggregators/cursors/base.rb index e75e9e8ad3ad8c..1e09283c353d11 100644 --- a/lib/gitlab/ci/components/usages/aggregators/cursors/base.rb +++ b/lib/gitlab/ci/components/usages/aggregators/cursors/base.rb @@ -6,8 +6,10 @@ module Components module Usages module Aggregators module Cursors - # This class represents a Redis cursor that keeps track of the processing - # position and data window in Gitlab::Ci::Components::Usages::Aggregator. + # This class represents a Redis cursor that keeps track of the data processing position + # and progression in Gitlab::Ci::Components::Usages::Aggregator. It updates and saves the + # attributes necessary for the aggregation to resume from where it left off on its next run. + # It also handles the logic for setting/advancing the cursor position and data window. class Base include Gitlab::Utils::StrongMemoize @@ -43,8 +45,8 @@ def data_ready? window.end_date < today end - def changed? - initial_attributes != attributes + def unchanged? + initial_attributes == attributes end def attributes diff --git a/spec/lib/gitlab/ci/components/usages/aggregator_spec.rb b/spec/lib/gitlab/ci/components/usages/aggregator_spec.rb index 26fde90543dc08..b54840c31f5f82 100644 --- a/spec/lib/gitlab/ci/components/usages/aggregator_spec.rb +++ b/spec/lib/gitlab/ci/components/usages/aggregator_spec.rb @@ -10,7 +10,7 @@ let(:usage_model) { Ci::Catalog::Resources::Components::Usage } let(:target_scope) { Ci::Catalog::Resource } let(:group_by_column) { :catalog_resource_id } - let(:cursor_redis_key) { 'my_redis_key:cursor' } + let(:lease_key) { 'my_lease_key' } let(:advanced_cursor_attributes) do { target_id: 0, @@ -173,6 +173,14 @@ it_behaves_like 'when the runtime limit is not reached' it_behaves_like 'with multiple distinct usage batches' end + + it 'prevents parallel processing with an exclusive lease guard' do + lease = Gitlab::ExclusiveLease.new(lease_key, timeout: 1.minute).tap(&:try_obtain) + result = run_new_aggregator_each_batch.last + + expect(result).to be_nil + lease.cancel + end end private @@ -182,7 +190,7 @@ def run_new_aggregator_each_batch target_scope: target_scope, group_by_column: group_by_column, cursor_type: cursor_type, - cursor_redis_key: cursor_redis_key + lease_key: lease_key ) batched_usage_counts = [] diff --git a/spec/lib/gitlab/ci/components/usages/aggregators/cursors/base_spec.rb b/spec/lib/gitlab/ci/components/usages/aggregators/cursors/base_spec.rb index 0bc184fd486cee..f8640186ee208d 100644 --- a/spec/lib/gitlab/ci/components/usages/aggregators/cursors/base_spec.rb +++ b/spec/lib/gitlab/ci/components/usages/aggregators/cursors/base_spec.rb @@ -171,32 +171,32 @@ def default_window end end - describe '#changed?' do - subject { cursor.changed? } + describe '#unchanged?' do + subject { cursor.unchanged? } - it { is_expected.to eq(false) } + it { is_expected.to eq(true) } context 'when cursor target_id is updated' do - it 'returns true' do + it 'returns false' do cursor.target_id += 1 - is_expected.to eq(true) + is_expected.to eq(false) end end context 'when cursor last_used_by_project_id is updated' do - it 'returns true' do + it 'returns false' do cursor.last_used_by_project_id += 1 - is_expected.to eq(true) + is_expected.to eq(false) end end context 'when cursor last_usage_count is updated' do - it 'returns true' do + it 'returns false' do cursor.last_usage_count += 1 - is_expected.to eq(true) + is_expected.to eq(false) end end end -- GitLab From 30c5e18b54ac9f171d542eea9dd69950e327d9f5 Mon Sep 17 00:00:00 2001 From: lma-git Date: Fri, 10 May 2024 20:36:05 -0700 Subject: [PATCH 4/4] Remove window management from cursor Refactor and remove the window management responsibility from the Cursor class. --- lib/gitlab/ci/components/usages/aggregator.rb | 75 +++--- .../components/usages/aggregators/cursor.rb | 118 +++++++++ .../usages/aggregators/cursors/base.rb | 142 ----------- .../aggregators/cursors/rolling_window.rb | 31 --- .../ci/components/usages/aggregator_spec.rb | 105 ++++---- .../usages/aggregators/cursor_spec.rb | 167 +++++++++++++ .../usages/aggregators/cursors/base_spec.rb | 230 ------------------ .../cursors/rolling_window_spec.rb | 104 -------- 8 files changed, 366 insertions(+), 606 deletions(-) create mode 100644 lib/gitlab/ci/components/usages/aggregators/cursor.rb delete mode 100644 lib/gitlab/ci/components/usages/aggregators/cursors/base.rb delete mode 100644 lib/gitlab/ci/components/usages/aggregators/cursors/rolling_window.rb create mode 100644 spec/lib/gitlab/ci/components/usages/aggregators/cursor_spec.rb delete mode 100644 spec/lib/gitlab/ci/components/usages/aggregators/cursors/base_spec.rb delete mode 100644 spec/lib/gitlab/ci/components/usages/aggregators/cursors/rolling_window_spec.rb diff --git a/lib/gitlab/ci/components/usages/aggregator.rb b/lib/gitlab/ci/components/usages/aggregator.rb index 92e2ff03a0bdf5..12fa36f9ec95b5 100644 --- a/lib/gitlab/ci/components/usages/aggregator.rb +++ b/lib/gitlab/ci/components/usages/aggregator.rb @@ -7,25 +7,33 @@ module Usages # Component usage is defined as the number of unique `used_by_project_id`s in the table # `p_catalog_resource_component_usages` for a given scope. # - # This aggregator iterates through the target scope in batches. For each target ID, it - # collects the usage count using `distinct_each_batch` for the given window. It utilizes - # a Redis cursor so that the counter can resume from where it left off on each run or - # advance to the next target/window. We collect the count in Rails because the SQL query + # This aggregator iterates through the target scope in batches. For each target ID, it collects + # the usage count using `distinct_each_batch` for the given usage window. Since this process can + # be interrupted when it reaches MAX_RUNTIME, we utilize a Redis cursor so the aggregator can + # resume from where it left off on each run. We collect the count in Rails because the SQL query # `COUNT(DISTINCT(*))` is not performant when the dataset is large. # + # RUNTIME: The actual total runtime will be slightly longer than MAX_RUNTIME because + # it depends on the execution time of `&usage_counts_block`. + # EXCLUSIVE LEASE: This aggregator is protected from parallel processing with an exclusive lease guard. + # WORKER: The worker running this service should be scheduled at the same cadence as MAX_RUNTIME, with: + # deduplicate :until_executed, if_deduplicated: :reschedule_once, ttl: LEASE_TIMEOUT + # ##### Usage # # each_batch: # - Yields each batch of `usage_counts` to the given block. + # - The block should be able to handle targets that might be reprocessed multiple times. # - `usage_counts` format: { target_object1 => 100, target_object2 => 200, ... } # - If the lease is obtained, returns a Result containing the `cursor` object and # `total_targets_completed`. Otherwise, returns nil. # # Example: # aggregator = Gitlab::Ci::Components::Usages::Aggregator.new( - # target_scope: Ci::Catalog::Resource, + # target_scope: Ci::Catalog::Resource.scope_to_get_only_unprocessed_targets, # group_by_column: :catalog_resource_id, - # cursor_type: :rolling_window, + # usage_start_date: Date.today - 30.days, + # usage_end_date: Date.today - 1.day, # lease_key: 'my_aggregator_service_lease_key' # ) # @@ -33,30 +41,17 @@ module Usages # # Bulk update usage counts in the database # end # - ##### Runtime, Exclusive Lease, and Worker - # - # The batch processing is limited to MAX_RUNTIME, but the actual total runtime will be - # slightly longer as it depends on the execution time of `&usage_counts_block`. - # This aggregator is protected from parallel processing with an exclusive lease guard. - # The worker running this service should be scheduled at the same cadence as MAX_RUNTIME - # so that the next job can be immediately queued after the current one completes. - # It should also have: - # deduplicate :until_executed, if_deduplicated: :reschedule_once, ttl: LEASE_TIMEOUT - # ##### Parameters # # target_scope: # - ActiveRecord relation to retrieve the target IDs. Processed in order of ID ascending. # - The target model class should have `include EachBatch`. - # group_by_column: - # - This should be the usage table's foreign key of the target_scope. - # e.g. target_scope = Ci::Catalog::Resource, group_by_column = :catalog_resource_id - # cursor_type: - # - Type of cursor to use. e.g. `:rolling_window` creates a cursor that always sets - # the usage data window to the last 30 days. - # lease_key: - # - Used for obtaining an exclusive lease. Also used as part of the cursor Redis key to - # save the cursor attributes. + # - When cursor.target_id gets reset to 0, the aggregator may reprocess targets that have + # already been processed for the given usage window. To minimize redundant reprocessing, + # add a limiting condition to the target scope so it only retrieves unprocessed targets. + # group_by_column: This should be the usage table's foreign key of the target_scope. + # usage_start_date & usage_end_date: Date objects specifiying the window of usage data to aggregate. + # lease_key: Used for obtaining an exclusive lease. Also used as part of the cursor Redis key. # # rubocop: disable CodeReuse/ActiveRecord -- Custom queries required for data processing class Aggregator @@ -70,16 +65,17 @@ class Aggregator MAX_RUNTIME = 4.minutes # Should be >= job scheduling frequency so there is no gap between job runs LEASE_TIMEOUT = 5.minutes # Should be MAX_RUNTIME + extra time to execute `&usage_counts_block` - CURSOR_TYPES = { - rolling_window: Aggregators::Cursors::RollingWindow - }.freeze - - def initialize(target_scope:, group_by_column:, cursor_type:, lease_key:) + def initialize(target_scope:, group_by_column:, usage_start_date:, usage_end_date:, lease_key:) @target_scope = target_scope @group_by_column = group_by_column @lease_key = lease_key # Used by ExclusiveLeaseGuard - @cursor = CURSOR_TYPES[cursor_type].new("#{lease_key}:cursor", target_scope: target_scope) @runtime_limiter = Gitlab::Metrics::RuntimeLimiter.new(MAX_RUNTIME) + + @cursor = Aggregators::Cursor.new( + redis_key: "#{lease_key}:cursor", + target_scope: target_scope, + usage_window: Aggregators::Cursor::Window.new(usage_start_date, usage_end_date) + ) end def each_batch(&usage_counts_block) @@ -95,11 +91,9 @@ def each_batch(&usage_counts_block) attr_reader :target_scope, :group_by_column, :cursor, :runtime_limiter def process_targets - total_targets_completed = 0 - return total_targets_completed unless cursor.data_ready? - # Restore the scope from cursor so we can resume from the last run restored_target_scope = target_scope.where('id >= ?', cursor.target_id) + total_targets_completed = 0 restored_target_scope.each_batch(of: TARGET_BATCH_SIZE) do |targets_relation| usage_counts = aggregate_usage_counts(targets_relation) @@ -110,8 +104,7 @@ def process_targets break if runtime_limiter.over_time? end - # Only advance the cursor if we're not in the middle of aggregating the usage count for a target - cursor.advance if cursor.unchanged? || cursor.last_usage_count == 0 + cursor.advance unless cursor.interrupted? cursor.save! total_targets_completed @@ -127,9 +120,9 @@ def aggregate_usage_counts(targets_relation) usage_scope = ::Ci::Catalog::Resources::Components::Usage .where(group_by_column => cursor.target_id) - .where(used_date: cursor.window.start_date..cursor.window.end_date) + .where(used_date: cursor.usage_window.start_date..cursor.usage_window.end_date) - # Restore the scope from cursor so we can resume from the last run + # Restore the scope from cursor so we can resume from the last run if interrupted restored_usage_scope = usage_scope.where('used_by_project_id > ?', cursor.last_used_by_project_id) usage_counts[target] = cursor.last_usage_count @@ -142,8 +135,10 @@ def aggregate_usage_counts(targets_relation) # to process for the current target, so the usage count is incomplete. We store the # last used_by_project_id and count so that we can resume counting on the next run. if runtime_limiter.over_time? && count == DISTINCT_USAGE_BATCH_SIZE - cursor.last_used_by_project_id = usages_relation.maximum(:used_by_project_id).to_i - cursor.last_usage_count = usage_counts[target] + cursor.interrupt!( + last_used_by_project_id: usages_relation.maximum(:used_by_project_id).to_i, + last_usage_count: usage_counts[target] + ) usage_counts.delete(target) # Remove the incomplete count break diff --git a/lib/gitlab/ci/components/usages/aggregators/cursor.rb b/lib/gitlab/ci/components/usages/aggregators/cursor.rb new file mode 100644 index 00000000000000..b04ed03c76bdfc --- /dev/null +++ b/lib/gitlab/ci/components/usages/aggregators/cursor.rb @@ -0,0 +1,118 @@ +# frozen_string_literal: true + +module Gitlab + module Ci + module Components + module Usages + module Aggregators + # This class represents a Redis cursor that keeps track of the data processing + # position and progression in Gitlab::Ci::Components::Usages::Aggregator. It + # updates and saves the attributes necessary for the aggregation to resume + # from where it was interrupted on its last run. + # + # The cursor's target_id is reset to 0 under these circumstances: + # 1. When the Redis cursor is first initialized. + # 2. When the Redis cursor expires or is lost and must be re-initialized. + # 3. When the cursor advances past max_target_id. + # + ##### Attributes + # + # target_id: The target ID from which to resume aggregating the usage counts. + # usage_window: The window of usage data to aggregate. + # last_used_by_project_id: The last used_by_project_id that was counted before interruption. + # last_usage_count: The last usage_count that was recorded before interruption. + # + # The last_used_by_project_id and last_usage_count only pertain to the exact target_id + # and usage_window that was saved before interruption. If either of the latter attributes + # change, then we reset the last_* values to 0. + # + class Cursor + include Gitlab::Utils::StrongMemoize + + Window = Struct.new(:start_date, :end_date) + + CURSOR_REDIS_KEY_TTL = 7.days + + attr_reader :target_id, :usage_window, :last_used_by_project_id, :last_usage_count, :interrupted + + alias_method :interrupted?, :interrupted + + def initialize(redis_key:, target_scope:, usage_window:) + @redis_key = redis_key + @target_scope = target_scope + @usage_window = usage_window + @interrupted = false + + fetch_initial_attributes! + end + + def interrupt!(last_used_by_project_id:, last_usage_count:) + @last_used_by_project_id = last_used_by_project_id + @last_usage_count = last_usage_count + @interrupted = true + end + + def target_id=(target_id) + reset_last_usage_attributes if target_id != self.target_id + @target_id = target_id + end + + def advance + self.target_id += 1 + self.target_id = 0 if target_id > max_target_id + end + + def attributes + { + target_id: target_id, + usage_window: usage_window, + last_used_by_project_id: last_used_by_project_id, + last_usage_count: last_usage_count + } + end + + def max_target_id + target_scope.maximum(:id).to_i + end + strong_memoize_attr :max_target_id + + def save! + Gitlab::Redis::SharedState.with do |redis| + redis.set(redis_key, attributes.to_json, ex: CURSOR_REDIS_KEY_TTL) + end + end + + private + + attr_reader :redis_key, :target_scope + + def fetch_initial_attributes! + data = Gitlab::Redis::SharedState.with do |redis| + raw = redis.get(redis_key) + raw.present? ? Gitlab::Json.parse(raw) : {} + end.with_indifferent_access + + start_date = parse_date(data.dig(:usage_window, :start_date)) + end_date = parse_date(data.dig(:usage_window, :end_date)) + + @target_id = data[:target_id].to_i + @last_used_by_project_id = data[:last_used_by_project_id].to_i + @last_usage_count = data[:last_usage_count].to_i + + reset_last_usage_attributes if usage_window != Window.new(start_date, end_date) + end + + def reset_last_usage_attributes + @last_used_by_project_id = 0 + @last_usage_count = 0 + end + + def parse_date(date_str) + Date.parse(date_str) if date_str + end + end + end + end + end + end +end diff --git a/lib/gitlab/ci/components/usages/aggregators/cursors/base.rb b/lib/gitlab/ci/components/usages/aggregators/cursors/base.rb deleted file mode 100644 index 1e09283c353d11..00000000000000 --- a/lib/gitlab/ci/components/usages/aggregators/cursors/base.rb +++ /dev/null @@ -1,142 +0,0 @@ -# frozen_string_literal: true - -module Gitlab - module Ci - module Components - module Usages - module Aggregators - module Cursors - # This class represents a Redis cursor that keeps track of the data processing position - # and progression in Gitlab::Ci::Components::Usages::Aggregator. It updates and saves the - # attributes necessary for the aggregation to resume from where it left off on its next run. - # It also handles the logic for setting/advancing the cursor position and data window. - class Base - include Gitlab::Utils::StrongMemoize - - Window = Struct.new(:start_date, :end_date) - - CURSOR_REDIS_KEY_TTL = 7.days - - attr_reader :target_id, :window - attr_accessor :last_used_by_project_id, :last_usage_count - - def initialize(redis_key, target_scope:) - @redis_key = redis_key - @target_scope = target_scope - - fetch_initial_attributes! - end - - def target_id=(target_id) - reset_last_usage_count if target_id != self.target_id - @target_id = target_id - end - - def advance - if target_id < max_target_id - self.target_id += 1 - else - self.target_id = 0 - self.window = next_window - end - end - - def data_ready? - window.end_date < today - end - - def unchanged? - initial_attributes == attributes - end - - def attributes - { - target_id: target_id, - window: window, - last_used_by_project_id: last_used_by_project_id, - last_usage_count: last_usage_count - } - end - - def max_target_id - target_scope.maximum(:id).to_i - end - strong_memoize_attr :max_target_id - - def save! - Gitlab::Redis::SharedState.with do |redis| - redis.set(redis_key, attributes.to_json, ex: CURSOR_REDIS_KEY_TTL) - end - end - - private - - attr_reader :redis_key, :target_scope, :initial_attributes - - def fetch_initial_attributes! - data = Gitlab::Redis::SharedState.with do |redis| - raw = redis.get(redis_key) - raw.present? ? Gitlab::Json.parse(raw) : {} - end.with_indifferent_access - - start_date = parse_date(data.dig(:window, :start_date)) - end_date = parse_date(data.dig(:window, :end_date)) - - @target_id = data[:target_id] || default_target_id - @window = start_date && end_date ? Window.new(start_date, end_date) : default_window - @last_used_by_project_id = data[:last_used_by_project_id].to_i - @last_usage_count = data[:last_usage_count].to_i - - self.window = default_window if use_default_window_when_lagging? && window.end_date < yesterday - - @initial_attributes = attributes.deep_dup - end - - def window=(window) - reset_last_usage_count if window != self.window - @window = window - end - - def reset_last_usage_count - self.last_used_by_project_id = 0 - self.last_usage_count = 0 - end - - # This time zone should be consistent with - # `Ci::Catalog::Resources::Components::Usage#set_used_date` - def today - Date.today - end - strong_memoize_attr :today - - def yesterday - Date.today - 1.day - end - strong_memoize_attr :yesterday - - def parse_date(date_str) - Date.parse(date_str) if date_str - end - - def default_target_id - 0 - end - - def use_default_window_when_lagging? - false - end - - def default_window - raise NotImplementedError - end - - def next_window - raise NotImplementedError - end - end - end - end - end - end - end -end diff --git a/lib/gitlab/ci/components/usages/aggregators/cursors/rolling_window.rb b/lib/gitlab/ci/components/usages/aggregators/cursors/rolling_window.rb deleted file mode 100644 index 74d0d722599176..00000000000000 --- a/lib/gitlab/ci/components/usages/aggregators/cursors/rolling_window.rb +++ /dev/null @@ -1,31 +0,0 @@ -# frozen_string_literal: true - -module Gitlab - module Ci - module Components - module Usages - module Aggregators - module Cursors - class RollingWindow < Base - WINDOW_LENGTH = 30.days - - private - - def use_default_window_when_lagging? - true - end - - def default_window - Window.new(today - WINDOW_LENGTH, today - 1.day) - end - - def next_window - Window.new(window.start_date + 1.day, window.end_date + 1.day) - end - end - end - end - end - end - end -end diff --git a/spec/lib/gitlab/ci/components/usages/aggregator_spec.rb b/spec/lib/gitlab/ci/components/usages/aggregator_spec.rb index b54840c31f5f82..f83f29939e37f5 100644 --- a/spec/lib/gitlab/ci/components/usages/aggregator_spec.rb +++ b/spec/lib/gitlab/ci/components/usages/aggregator_spec.rb @@ -4,78 +4,64 @@ RSpec.describe Gitlab::Ci::Components::Usages::Aggregator, :clean_gitlab_redis_shared_state, :freeze_time, feature_category: :pipeline_composition do - let_it_be(:today) { Date.today } - let_it_be(:yesterday) { today - 1.day } + let_it_be(:usage_start_date) { Date.today - 30.days } + let_it_be(:usage_end_date) { Date.today - 1.day } let(:usage_model) { Ci::Catalog::Resources::Components::Usage } let(:target_scope) { Ci::Catalog::Resource } let(:group_by_column) { :catalog_resource_id } let(:lease_key) { 'my_lease_key' } - let(:advanced_cursor_attributes) do - { - target_id: 0, - window: next_window, - last_used_by_project_id: 0, - last_usage_count: 0 - } - end - describe 'with cursor_type: :rolling_window' do - let(:cursor_type) { :rolling_window } + let(:usage_window) do + Gitlab::Ci::Components::Usages::Aggregators::Cursor::Window.new(usage_start_date, usage_end_date) + end - let_it_be(:thirty_days_ago) { today - 30.days } - let_it_be(:cursor_class) { Gitlab::Ci::Components::Usages::Aggregators::Cursors::RollingWindow } + before_all do + # First catalog resource: 3 components and 3 usages per component on usage_end_date + version = create(:ci_catalog_resource_version) + create_list(:ci_catalog_resource_component, 3, version: version).each do |component| + (1..3).each do |k| + create( + :ci_catalog_resource_component_usage, + component: component, + used_date: usage_end_date, + used_by_project_id: k + ) + end + end - let_it_be(:initial_redis_window) { cursor_class::Window.new(thirty_days_ago - 1.day, yesterday - 1.day) } - let_it_be(:default_window) { cursor_class::Window.new(thirty_days_ago, yesterday) } - let_it_be(:next_window) { cursor_class::Window.new(thirty_days_ago + 1.day, today) } + # Create 4 more catalog resources, each with 1-4 components and 0-6 usages + # per component on different dates before and after usage_end_date + create_list(:ci_catalog_resource_version, 4).each_with_index do |version, i| + create_list(:ci_catalog_resource_component, i + 1, version: version).each_with_index do |component, j| + next unless j > 0 - before_all do - # First catalog resource: 3 components and 3 usages per component within the default window - version = create(:ci_catalog_resource_version) - create_list(:ci_catalog_resource_component, 3, version: version).each do |component| - (1..3).each do |k| + (1..j * 2).each do |k| create( :ci_catalog_resource_component_usage, component: component, - used_date: default_window.end_date, + used_date: usage_end_date - 3.days + k.days, used_by_project_id: k ) end end - - # Create 4 more catalog resources, each with 1-4 components and 0-6 usages per component across different windows - create_list(:ci_catalog_resource_version, 4).each_with_index do |version, i| - create_list(:ci_catalog_resource_component, i + 1, version: version).each_with_index do |component, j| - next unless j > 0 - - (1..j * 2).each do |k| - create( - :ci_catalog_resource_component_usage, - component: component, - used_date: initial_redis_window.end_date - 3.days + k.days, - used_by_project_id: k - ) - end - end - end end + end + describe '#each_batch' do shared_examples 'when the runtime limit is not reached' do - it 'returns the expected result for each run' do - # On 1st run, we process all catalog resources and advance the cursor + it 'returns the expected result' do + # We process all catalog resources and advance the cursor batched_usage_counts, result = run_new_aggregator_each_batch expect(batched_usage_counts).to eq(expected_batched_usage_counts) expect(result.total_targets_completed).to eq(target_scope.count) - expect(result.cursor.attributes).to eq(advanced_cursor_attributes) - - # On 2nd run, it's not tomorrow yet, so the cursor remains unchanged - batched_usage_counts, result = run_new_aggregator_each_batch - - expect(batched_usage_counts).to eq([]) - expect(result.total_targets_completed).to eq(0) - expect(result.cursor.attributes).to eq(advanced_cursor_attributes) + expect(result.cursor.attributes).to eq({ + target_id: 0, + usage_window: usage_window, + last_used_by_project_id: 0, + last_usage_count: 0 + }) end end @@ -100,7 +86,7 @@ expect(result.total_targets_completed).to eq(0) expect(result.cursor.attributes).to eq({ target_id: target_scope.first.id, - window: default_window, + usage_window: usage_window, last_used_by_project_id: 2, last_usage_count: 2 }) @@ -112,12 +98,12 @@ expect(result.total_targets_completed).to eq(1) expect(result.cursor.attributes).to eq({ target_id: target_scope.first.id + 1, - window: default_window, + usage_window: usage_window, last_used_by_project_id: 0, last_usage_count: 0 }) - all_batched_usage_counts = batched_usage_counts + repeat_aggregator_each_batch_until_done + all_batched_usage_counts = batched_usage_counts + repeat_new_aggregator_each_batch_until_done batched_usage_counts_merged = all_batched_usage_counts.flatten.reduce(&:merge) expect(batched_usage_counts_merged.length).to eq(5) @@ -133,14 +119,14 @@ expect(result.total_targets_completed).to eq(0) expect(result.cursor.attributes).to eq({ target_id: target_scope.first.id, - window: default_window, + usage_window: usage_window, last_used_by_project_id: 2, last_usage_count: 2 }) target_scope.first.delete - all_batched_usage_counts = repeat_aggregator_each_batch_until_done + all_batched_usage_counts = repeat_new_aggregator_each_batch_until_done batched_usage_counts_merged = all_batched_usage_counts.reduce(&:merge) expect(batched_usage_counts_merged.length).to eq(4) @@ -152,7 +138,7 @@ it 'returns the expected result' do usage_model.delete_all - all_batched_usage_counts = repeat_aggregator_each_batch_until_done + all_batched_usage_counts = repeat_new_aggregator_each_batch_until_done batched_usage_counts_merged = all_batched_usage_counts.reduce(&:merge) expect(batched_usage_counts_merged.length).to eq(5) @@ -189,7 +175,8 @@ def run_new_aggregator_each_batch aggregator = described_class.new( target_scope: target_scope, group_by_column: group_by_column, - cursor_type: cursor_type, + usage_start_date: usage_start_date, + usage_end_date: usage_end_date, lease_key: lease_key ) @@ -209,7 +196,7 @@ def expected_batched_usage_counts usage_counts = usage_model .includes(:catalog_resource) .select('catalog_resource_id, COUNT(DISTINCT used_by_project_id) AS usage_count') - .where(used_date: default_window.start_date..default_window.end_date) + .where(used_date: usage_start_date..usage_end_date) .where(group_by_column => targets) .group(:catalog_resource_id) .each_with_object({}) { |r, hash| hash[r.catalog_resource] = r.usage_count } @@ -224,13 +211,13 @@ def expected_batched_usage_counts_merged expected_batched_usage_counts.reduce(&:merge) end - def repeat_aggregator_each_batch_until_done + def repeat_new_aggregator_each_batch_until_done all_batched_usage_counts = [] 30.times do batched_usage_counts, result = run_new_aggregator_each_batch all_batched_usage_counts << batched_usage_counts - break if result.cursor.window == next_window + break if result.cursor.target_id == 0 end all_batched_usage_counts.flatten diff --git a/spec/lib/gitlab/ci/components/usages/aggregators/cursor_spec.rb b/spec/lib/gitlab/ci/components/usages/aggregators/cursor_spec.rb new file mode 100644 index 00000000000000..d4d416e4e586bb --- /dev/null +++ b/spec/lib/gitlab/ci/components/usages/aggregators/cursor_spec.rb @@ -0,0 +1,167 @@ +# frozen_string_literal: true + +require 'spec_helper' + +RSpec.describe Gitlab::Ci::Components::Usages::Aggregators::Cursor, :clean_gitlab_redis_shared_state, + feature_category: :pipeline_composition do + let(:redis_key) { 'my_redis_key:cursor' } + let(:target_scope) { class_double(Ci::Catalog::Resource, maximum: max_target_id) } + let(:max_target_id) { initial_redis_attributes[:target_id] } + + let(:usage_window) { described_class::Window.new(Date.parse('2024-01-08'), Date.parse('2024-01-14')) } + let(:initial_redis_usage_window) { usage_window } + + let(:initial_redis_attributes) do + { + target_id: 1, + usage_window: initial_redis_usage_window, + last_used_by_project_id: 100, + last_usage_count: 10 + } + end + + subject(:cursor) { described_class.new(redis_key: redis_key, target_scope: target_scope, usage_window: usage_window) } + + before do + Gitlab::Redis::SharedState.with do |redis| + redis.set(redis_key, initial_redis_attributes.to_json) + end + end + + describe '.new' do + it 'fetches and parses the attributes from Redis' do + expect(cursor.attributes).to eq(initial_redis_attributes) + end + + context 'when Redis usage_window is different than the given usage_window' do + let(:initial_redis_usage_window) do + described_class::Window.new(Date.parse('2024-01-01'), Date.parse('2024-01-07')) + end + + it 'resets last usage attributes' do + expect(cursor.attributes).to eq({ + target_id: initial_redis_attributes[:target_id], + usage_window: usage_window, + last_used_by_project_id: 0, + last_usage_count: 0 + }) + end + end + + context 'when cursor does not exist in Redis' do + before do + Gitlab::Redis::SharedState.with do |redis| + redis.del(redis_key) + end + end + + it 'sets target_id and last usage attributes to zero' do + expect(cursor.attributes).to eq({ + target_id: 0, + usage_window: usage_window, + last_used_by_project_id: 0, + last_usage_count: 0 + }) + end + end + end + + describe '#interrupt!' do + it 'updates last usage attributes and sets interrupted? to true' do + expect(cursor.interrupted?).to eq(false) + + cursor.interrupt!( + last_used_by_project_id: initial_redis_attributes[:last_used_by_project_id] + 1, + last_usage_count: initial_redis_attributes[:last_usage_count] + 1 + ) + + expect(cursor.interrupted?).to eq(true) + expect(cursor.attributes).to eq({ + target_id: initial_redis_attributes[:target_id], + usage_window: usage_window, + last_used_by_project_id: initial_redis_attributes[:last_used_by_project_id] + 1, + last_usage_count: initial_redis_attributes[:last_usage_count] + 1 + }) + end + end + + describe '#target_id=(target_id)' do + context 'when new target_id is different from cursor target_id' do + it 'sets new target_id and resets last usage attributes' do + cursor.target_id = initial_redis_attributes[:target_id] + 1 + + expect(cursor.attributes).to eq({ + target_id: initial_redis_attributes[:target_id] + 1, + usage_window: usage_window, + last_used_by_project_id: 0, + last_usage_count: 0 + }) + end + end + + context 'when new target_id is the same as cursor target_id' do + it 'does not change cursor attributes' do + expect(cursor.attributes).to eq(initial_redis_attributes) + end + end + end + + describe '#advance' do + context 'when cursor target_id is less than max_target_id' do + let(:max_target_id) { initial_redis_attributes[:target_id] + 100 } + + it 'increments cursor target_id and resets last usage attributes' do + cursor.advance + + expect(cursor.attributes).to eq({ + target_id: initial_redis_attributes[:target_id] + 1, + usage_window: usage_window, + last_used_by_project_id: 0, + last_usage_count: 0 + }) + end + end + + context 'when cursor target_id is equal to or greater than max_target_id' do + it 'resets cursor target_id and last usage attributes' do + cursor.advance + + expect(cursor.attributes).to eq({ + target_id: 0, + usage_window: usage_window, + last_used_by_project_id: 0, + last_usage_count: 0 + }) + end + end + end + + describe '#max_target_id' do + let(:target_scope) { Ci::Catalog::Resource } + + before_all do + create(:ci_catalog_resource, id: 123) + create(:ci_catalog_resource, id: 100) + end + + it 'returns maximum ID of the target scope' do + expect(cursor.max_target_id).to eq(123) + end + end + + describe '#save!' do + it 'saves cursor attributes to Redis as JSON' do + cursor.target_id = 11 + cursor.interrupt!( + last_used_by_project_id: 33, + last_usage_count: 22 + ) + + cursor.save! + data = Gitlab::Redis::SharedState.with { |redis| redis.get(redis_key) } + + expect(data).to eq('{"target_id":11,"usage_window":{"start_date":"2024-01-08","end_date":"2024-01-14"},' \ + '"last_used_by_project_id":33,"last_usage_count":22}') + end + end +end diff --git a/spec/lib/gitlab/ci/components/usages/aggregators/cursors/base_spec.rb b/spec/lib/gitlab/ci/components/usages/aggregators/cursors/base_spec.rb deleted file mode 100644 index f8640186ee208d..00000000000000 --- a/spec/lib/gitlab/ci/components/usages/aggregators/cursors/base_spec.rb +++ /dev/null @@ -1,230 +0,0 @@ -# frozen_string_literal: true - -require 'spec_helper' - -RSpec.describe Gitlab::Ci::Components::Usages::Aggregators::Cursors::Base, :clean_gitlab_redis_shared_state, - feature_category: :pipeline_composition do - let(:initial_redis_window) { described_class::Window.new(Date.parse('2024-01-01'), Date.parse('2024-01-07')) } - let(:default_window) { described_class::Window.new(Date.parse('2024-01-08'), Date.parse('2024-01-14')) } - let(:next_window) { described_class::Window.new(Date.parse('2024-01-15'), Date.parse('2024-01-21')) } - - let(:cursor_class) do - Class.new(described_class) do - def default_window - self.class::Window.new(Date.parse('2024-01-08'), Date.parse('2024-01-14')) # default_window - end - - def next_window - self.class::Window.new(Date.parse('2024-01-15'), Date.parse('2024-01-21')) # next_window - end - end - end - - let(:redis_key) { 'my_redis_key:cursor' } - let(:target_scope) { class_double(Ci::Catalog::Resource, maximum: max_target_id) } - let(:max_target_id) { initial_redis_attributes[:target_id] } - - let(:initial_redis_attributes) do - { - target_id: 1, - window: initial_redis_window, - last_used_by_project_id: 100, - last_usage_count: 10 - } - end - - subject(:cursor) { cursor_class.new(redis_key, target_scope: target_scope) } - - before do - Gitlab::Redis::SharedState.with do |redis| - redis.set(redis_key, initial_redis_attributes.to_json) - end - end - - describe '.new' do - it 'fetches and parses the attributes from Redis' do - expect(cursor.attributes).to eq(initial_redis_attributes) - end - - context 'when cursor does not exist in Redis' do - before do - Gitlab::Redis::SharedState.with do |redis| - redis.del(redis_key) - end - end - - it 'sets cursor attributes to their default values' do - expect(cursor.attributes).to eq({ - target_id: 0, - window: default_window, - last_used_by_project_id: 0, - last_usage_count: 0 - }) - end - - context 'when `default_window` is not implemented' do - let(:cursor_class) do - Class.new(described_class) do - def next_window - self.class::Window.new(Date.parse('2024-01-15'), Date.parse('2024-01-21')) - end - end - end - - it 'raises NotImplementedError' do - expect { cursor }.to raise_error(NotImplementedError) - end - end - end - end - - describe '#target_id=(target_id)' do - context 'when new target_id is different from cursor target_id' do - it 'sets new target_id and resets last usage count' do - cursor.target_id = initial_redis_attributes[:target_id] + 1 - - expect(cursor.attributes).to eq({ - target_id: initial_redis_attributes[:target_id] + 1, - window: initial_redis_attributes[:window], - last_used_by_project_id: 0, - last_usage_count: 0 - }) - end - end - - context 'when new target_id is the same as cursor target_id' do - it 'does not change cursor attributes' do - expect(cursor.attributes).to eq(initial_redis_attributes) - end - end - end - - describe '#advance' do - context 'when cursor target_id is less than max_target_id' do - let(:max_target_id) { initial_redis_attributes[:target_id] + 100 } - - it 'increments cursor target_id and resets last usage count' do - cursor.advance - - expect(cursor.attributes).to eq({ - target_id: initial_redis_attributes[:target_id] + 1, - window: initial_redis_attributes[:window], - last_used_by_project_id: 0, - last_usage_count: 0 - }) - end - end - - context 'when cursor target_id is equal to or greater than max_target_id' do - it 'resets cursor target_id, resets last usage count, and sets window to next window' do - cursor.advance - - expect(cursor.attributes).to eq({ - target_id: 0, - window: next_window, - last_used_by_project_id: 0, - last_usage_count: 0 - }) - end - - context 'when `next_window` is not implemented' do - let(:cursor_class) do - Class.new(described_class) do - def default_window - self.class::Window.new(Date.parse('2024-01-08'), Date.parse('2024-01-14')) - end - end - end - - it 'raises NotImplementedError' do - expect { cursor.advance }.to raise_error(NotImplementedError) - end - end - end - end - - describe '#data_ready?', :freeze_time do - subject { cursor.data_ready? } - - context 'when cursor window end_date is yesterday' do - it 'returns true' do - travel_to((initial_redis_window.end_date + 1.day).to_time) - - is_expected.to eq(true) - end - end - - context 'when cursor window end_date is today' do - it 'returns false' do - travel_to(initial_redis_window.end_date.to_time) - - is_expected.to eq(false) - end - end - - context 'when cursor window end_date is tomorrow' do - it 'returns false' do - travel_to((initial_redis_window.end_date - 1.day).to_time) - - is_expected.to eq(false) - end - end - end - - describe '#unchanged?' do - subject { cursor.unchanged? } - - it { is_expected.to eq(true) } - - context 'when cursor target_id is updated' do - it 'returns false' do - cursor.target_id += 1 - - is_expected.to eq(false) - end - end - - context 'when cursor last_used_by_project_id is updated' do - it 'returns false' do - cursor.last_used_by_project_id += 1 - - is_expected.to eq(false) - end - end - - context 'when cursor last_usage_count is updated' do - it 'returns false' do - cursor.last_usage_count += 1 - - is_expected.to eq(false) - end - end - end - - describe '#max_target_id' do - let(:target_scope) { Ci::Catalog::Resource } - - before_all do - create(:ci_catalog_resource, id: 123) - create(:ci_catalog_resource, id: 100) - end - - it 'returns maximum ID of the target scope' do - expect(cursor.max_target_id).to eq(123) - end - end - - describe '#save!' do - it 'saves cursor attributes to Redis as JSON' do - cursor.target_id = 11 - cursor.last_used_by_project_id = 33 - cursor.last_usage_count = 22 - cursor.save! - - data = Gitlab::Redis::SharedState.with { |redis| redis.get(redis_key) } - - expect(data).to eq('{"target_id":11,"window":{"start_date":"2024-01-01","end_date":"2024-01-07"},' \ - '"last_used_by_project_id":33,"last_usage_count":22}') - end - end -end diff --git a/spec/lib/gitlab/ci/components/usages/aggregators/cursors/rolling_window_spec.rb b/spec/lib/gitlab/ci/components/usages/aggregators/cursors/rolling_window_spec.rb deleted file mode 100644 index 845c0d65263392..00000000000000 --- a/spec/lib/gitlab/ci/components/usages/aggregators/cursors/rolling_window_spec.rb +++ /dev/null @@ -1,104 +0,0 @@ -# frozen_string_literal: true - -require 'spec_helper' - -RSpec.describe Gitlab::Ci::Components::Usages::Aggregators::Cursors::RollingWindow, :clean_gitlab_redis_shared_state, - :freeze_time, feature_category: :pipeline_composition do - let(:today) { Date.today } - let(:yesterday) { today - 1.day } - let(:thirty_days_ago) { today - 30.days } - - let(:initial_redis_window) { described_class::Window.new(thirty_days_ago - 1.day, yesterday - 1.day) } - let(:default_window) { described_class::Window.new(thirty_days_ago, yesterday) } - let(:next_window) { described_class::Window.new(thirty_days_ago + 1.day, today) } - - let(:redis_key) { 'my_redis_key:cursor' } - let(:target_scope) { class_double(Ci::Catalog::Resource, maximum: max_target_id) } - let(:max_target_id) { initial_redis_attributes[:target_id] } - - let(:initial_redis_attributes) do - { - target_id: 1, - window: initial_redis_window, - last_used_by_project_id: 100, - last_usage_count: 10 - } - end - - subject(:cursor) { described_class.new(redis_key, target_scope: target_scope) } - - before do - Gitlab::Redis::SharedState.with do |redis| - redis.set(redis_key, initial_redis_attributes.to_json) - end - end - - describe '.new' do - context 'when cursor window end_date is older than yesterday' do - it 'sets cursor window to default and resets last usage count' do - expect(cursor.attributes).to eq({ - target_id: initial_redis_attributes[:target_id], - window: default_window, - last_used_by_project_id: 0, - last_usage_count: 0 - }) - end - end - - context 'when cursor window end_date is yesterday' do - it 'does not change cursor attributes' do - travel_to((initial_redis_window.end_date + 1.day).to_time) - - expect(cursor.attributes).to eq(initial_redis_attributes) - end - end - - context 'when cursor window end_date is today' do - it 'does not change cursor attributes' do - travel_to(initial_redis_window.end_date.to_time) - - expect(cursor.attributes).to eq(initial_redis_attributes) - end - end - - context 'when cursor window end_date is tomorrow' do - it 'does not change cursor attributes' do - travel_to((initial_redis_window.end_date - 1.day).to_time) - - expect(cursor.attributes).to eq(initial_redis_attributes) - end - end - - context 'when cursor does not exist in Redis' do - before do - Gitlab::Redis::SharedState.with do |redis| - redis.del(redis_key) - end - end - - it 'sets cursor attributes to their default values' do - expect(cursor.attributes).to eq({ - target_id: 0, - window: default_window, - last_used_by_project_id: 0, - last_usage_count: 0 - }) - end - end - end - - describe '#advance' do - context 'when cursor target_id is equal to or greater than max_target_id' do - it 'resets cursor target_id, resets last usage count, and sets window to next window' do - cursor.advance - - expect(cursor.attributes).to eq({ - target_id: 0, - window: next_window, - last_used_by_project_id: 0, - last_usage_count: 0 - }) - end - end - end -end -- GitLab