From 03245ef939e6a2a097e32fcefa23626ac03d0387 Mon Sep 17 00:00:00 2001 From: rkumar555 Date: Tue, 4 Feb 2025 12:34:01 +0100 Subject: [PATCH 1/4] Deprecate create_bulk_tasks and use create_tasks --- .rubocop_todo/rspec/change_by_zero.yml | 1 - ee/app/models/search/zoekt/repository.rb | 12 +- .../search/zoekt/indexing_task_service.rb | 7 +- .../repo_marked_as_to_delete_event_worker.rb | 12 +- .../models/search/zoekt/repository_spec.rb | 130 +++--------------- ...o_marked_as_to_delete_event_worker_spec.rb | 2 +- 6 files changed, 24 insertions(+), 140 deletions(-) diff --git a/.rubocop_todo/rspec/change_by_zero.yml b/.rubocop_todo/rspec/change_by_zero.yml index 8b3c68c73ec4a6..de4be7b6b016f5 100644 --- a/.rubocop_todo/rspec/change_by_zero.yml +++ b/.rubocop_todo/rspec/change_by_zero.yml @@ -6,7 +6,6 @@ RSpec/ChangeByZero: - 'ee/spec/lib/ee/gitlab/scim/group/deprovisioning_service_spec.rb' - 'ee/spec/lib/merge_requests/external_status_check_changes_auditor_spec.rb' - 'ee/spec/models/ee/project_member_spec.rb' - - 'ee/spec/models/search/zoekt/repository_spec.rb' - 'ee/spec/requests/api/admin/search/zoekt_spec.rb' - 'ee/spec/requests/api/graphql/audit_events/streaming/instance_headers/destroy_spec.rb' - 'ee/spec/requests/api/graphql/mutations/audit_events/instance_external_audit_event_destinations/destroy_spec.rb' diff --git a/ee/app/models/search/zoekt/repository.rb b/ee/app/models/search/zoekt/repository.rb index e81fc7ee140960..0841686796c3a5 100644 --- a/ee/app/models/search/zoekt/repository.rb +++ b/ee/app/models/search/zoekt/repository.rb @@ -53,16 +53,8 @@ class Repository < ApplicationRecord scope :searchable, -> { where(state: SEARCHABLE_STATES) } - def self.create_tasks(project_id:, zoekt_index:, task_type:, perform_at:) - project = Project.find_by_id(project_id) - find_or_initialize_by(project_identifier: project_id, project: project, zoekt_index: zoekt_index).tap do |item| - item.save! if item.new_record? - break if item.tasks.pending.exists?(zoekt_node_id: zoekt_index.zoekt_node_id, task_type: task_type) - break if item.failed? && task_type != :delete_repo - - item.initializing! if item.pending? - item.tasks.create!(zoekt_node_id: zoekt_index.zoekt_node_id, task_type: task_type, perform_at: perform_at) - end + def self.find_or_create_by_project(identifier, project, zoekt_index:) + find_or_create_by!(project_identifier: identifier, project: project, zoekt_index: zoekt_index) end def self.create_bulk_tasks(task_type: :index_repo, perform_at: Time.zone.now) diff --git a/ee/app/services/search/zoekt/indexing_task_service.rb b/ee/app/services/search/zoekt/indexing_task_service.rb index 6cc7f90eca6811..6f138fee703795 100644 --- a/ee/app/services/search/zoekt/indexing_task_service.rb +++ b/ee/app/services/search/zoekt/indexing_task_service.rb @@ -54,11 +54,8 @@ def execute perform_at = Time.current perform_at += delay if delay - ApplicationRecord.transaction do - Repository.create_tasks( - project_id: project_id, zoekt_index: idx, task_type: current_task_type, perform_at: perform_at - ) - end + zoekt_repo = idx.zoekt_repositories.find_or_create_by_project(project_id, project, zoekt_index: idx) + Repository.id_in(zoekt_repo).create_bulk_tasks(task_type: current_task_type, perform_at: perform_at) end end diff --git a/ee/app/workers/search/zoekt/repo_marked_as_to_delete_event_worker.rb b/ee/app/workers/search/zoekt/repo_marked_as_to_delete_event_worker.rb index 100e08be81c966..7d7ff48d9f9a14 100644 --- a/ee/app/workers/search/zoekt/repo_marked_as_to_delete_event_worker.rb +++ b/ee/app/workers/search/zoekt/repo_marked_as_to_delete_event_worker.rb @@ -15,17 +15,7 @@ class RepoMarkedAsToDeleteEventWorker BATCH_SIZE = 1000 def handle_event(_event) - repos = Repository.should_be_deleted.limit(BATCH_SIZE) - - repos.each do |repo| - # Note: this has checks in place that prevent creating duplicate tasks - Repository.create_tasks( - project_id: repo.project_identifier, - zoekt_index: repo.zoekt_index, - task_type: :delete_repo, - perform_at: Time.current - ) - end + Repository.should_be_deleted.limit(BATCH_SIZE).create_bulk_tasks(task_type: :delete_repo) end end end diff --git a/ee/spec/models/search/zoekt/repository_spec.rb b/ee/spec/models/search/zoekt/repository_spec.rb index 2553b18751c669..5681da3410d480 100644 --- a/ee/spec/models/search/zoekt/repository_spec.rb +++ b/ee/spec/models/search/zoekt/repository_spec.rb @@ -87,121 +87,27 @@ end end - describe '.create_tasks', :freeze_time do - let(:task_type) { :index_repo } - - context 'when repository does not exists for a project and zoekt_index' do - let_it_be(:project) { create(:project) } - let_it_be(:index) { create(:zoekt_index) } - - it 'creates a new initializing repository and task' do - perform_at = Time.zone.now - expect do - described_class.create_tasks(project_id: project.id, zoekt_index: index, task_type: task_type, - perform_at: perform_at - ) - end.to change { described_class.count }.by(1).and change { Search::Zoekt::Task.count }.by(1) - repo = described_class.last - expect(repo).to be_initializing - expect(repo.project).to eq project - expect(repo.zoekt_index).to eq index - task = Search::Zoekt::Task.last - expect(task.zoekt_repository).to eq repo - expect(task.project_identifier).to eq repo.project_identifier - expect(task).to be_index_repo - expect(task.perform_at).to eq perform_at + describe '.find_or_create_by_project' do + let_it_be(:zoekt_index) { create(:zoekt_index) } + let_it_be(:project) { create(:project) } + + context 'when zoekt_repository exists with the given params' do + let_it_be(:zoekt_repository) { create(:zoekt_repository, project: project, zoekt_index: zoekt_index) } + + it 'returns the existing record' do + result = nil + expect { result = described_class.find_or_create_by_project(project.id, project, zoekt_index: zoekt_index) } + .not_to change { described_class.count } + expect(result.project_identifier).to eq project.id end end - context 'when repository already exists for a project and zoekt_index' do - let_it_be(:repo) { create(:zoekt_repository) } - let_it_be(:zoekt_index) { repo.zoekt_index } - - it 'creates task' do - perform_at = Time.zone.now - expect do - described_class.create_tasks(project_id: repo.project_identifier, zoekt_index: zoekt_index, - task_type: task_type, perform_at: perform_at - ) - end.to change { described_class.count }.by(0).and change { Search::Zoekt::Task.count }.by(1) - task = Search::Zoekt::Task.last - expect(task.zoekt_repository).to eq repo - expect(task.project_identifier).to eq repo.project_identifier - expect(task).to be_index_repo - expect(task.perform_at).to eq perform_at - end - - context 'when project is already deleted' do - let_it_be(:repo_with_deleted_project) { create(:zoekt_repository, zoekt_index: zoekt_index) } - let_it_be(:repo_with_deleted_project2) { create(:zoekt_repository, zoekt_index: zoekt_index) } - - before do - [repo_with_deleted_project.project, repo_with_deleted_project2.project].map(&:destroy!) - end - - it 'creates task with the supplied project_id' do - perform_at = Time.zone.now - expect do - described_class.create_tasks(project_id: repo_with_deleted_project2.project_identifier, - zoekt_index: zoekt_index, task_type: :delete_repo, perform_at: perform_at - ) - end.to change { described_class.count }.by(0).and change { Search::Zoekt::Task.count }.by(1) - task = Search::Zoekt::Task.last - expect(task.zoekt_repository).to eq repo_with_deleted_project2 - expect(task.project_identifier).to eq repo_with_deleted_project2.project_identifier - expect(task).to be_delete_repo - expect(task.perform_at).to eq perform_at - end - end - - context 'when there is already a pending task with the provided zoekt_node_id and task_type' do - before do - create(:zoekt_task, :pending, zoekt_repository: repo, task_type: task_type, node: zoekt_index.node) - end - - it 'does not create the new task' do - expect do - described_class.create_tasks(project_id: repo.project_identifier, zoekt_index: zoekt_index, - task_type: task_type, perform_at: Time.zone.now - ) - end.not_to change { Search::Zoekt::Task.count } - end - end - end - - context 'when repository is in failed state' do - let_it_be(:repo) { create(:zoekt_repository, state: :failed) } - let_it_be(:zoekt_index) { repo.zoekt_index } - - context 'and task_type is index_repo' do - let(:task_type) { :index_repo } - - it 'does not creates task' do - perform_at = Time.zone.now - expect do - described_class.create_tasks(project_id: repo.project_identifier, zoekt_index: zoekt_index, - task_type: task_type, perform_at: perform_at - ) - end.not_to change { described_class.count } - end - end - - context 'and task_type is delete_repo' do - let(:task_type) { :delete_repo } - - it 'creates task' do - perform_at = Time.zone.now - expect do - described_class.create_tasks(project_id: repo.project_identifier, zoekt_index: zoekt_index, - task_type: task_type, perform_at: perform_at - ) - end.to change { described_class.count }.by(0).and change { Search::Zoekt::Task.count }.by(1) - task = Search::Zoekt::Task.last - expect(task.zoekt_repository).to eq repo - expect(task.project_identifier).to eq repo.project_identifier - expect(task).to be_delete_repo - expect(task.perform_at).to eq perform_at - end + context 'when zoekt_repository does not exists with the given params' do + it 'creates and return the new record' do + result = nil + expect { result = described_class.find_or_create_by_project(project.id, project, zoekt_index: zoekt_index) } + .to change { described_class.count }.by(1) + expect(result.project_identifier).to eq project.id end end end diff --git a/ee/spec/workers/search/zoekt/repo_marked_as_to_delete_event_worker_spec.rb b/ee/spec/workers/search/zoekt/repo_marked_as_to_delete_event_worker_spec.rb index 3bb96c95b76903..a69daabb85c0ad 100644 --- a/ee/spec/workers/search/zoekt/repo_marked_as_to_delete_event_worker_spec.rb +++ b/ee/spec/workers/search/zoekt/repo_marked_as_to_delete_event_worker_spec.rb @@ -17,7 +17,7 @@ it_behaves_like 'an idempotent worker' do it 'creates a delete repo task for all repos in the list' do expect(Search::Zoekt::Repository).to receive(:should_be_deleted).and_return(scope) - expect(scope).to receive(:limit).with(described_class::BATCH_SIZE).and_return(repos) + expect(scope).to receive(:limit).with(described_class::BATCH_SIZE).and_call_original expect do consume_event(subscriber: described_class, event: event) -- GitLab From d7bdea7b660e9e835051ef981be0f2e3298a3b5e Mon Sep 17 00:00:00 2001 From: rkumar555 Date: Tue, 4 Feb 2025 17:01:05 +0100 Subject: [PATCH 2/4] Move the method to create zoekt repos to zoekt_index --- ee/app/models/search/zoekt/index.rb | 4 ++ ee/app/models/search/zoekt/repository.rb | 4 -- .../search/zoekt/indexing_task_service.rb | 2 +- ee/spec/models/search/zoekt/index_spec.rb | 41 +++++++++++++++++++ .../models/search/zoekt/repository_spec.rb | 25 ----------- 5 files changed, 46 insertions(+), 30 deletions(-) diff --git a/ee/app/models/search/zoekt/index.rb b/ee/app/models/search/zoekt/index.rb index a9de8aeadd205e..df71a1de953869 100644 --- a/ee/app/models/search/zoekt/index.rb +++ b/ee/app/models/search/zoekt/index.rb @@ -148,6 +148,10 @@ def should_be_deleted? SHOULD_BE_DELETED_STATES.include? state.to_sym end + def find_or_create_repository_by_project!(identifier, project) + zoekt_repositories.find_or_create_by!(project_identifier: identifier, project: project) + end + private def storage_percent_used diff --git a/ee/app/models/search/zoekt/repository.rb b/ee/app/models/search/zoekt/repository.rb index 0841686796c3a5..ed7c7ee02d14ef 100644 --- a/ee/app/models/search/zoekt/repository.rb +++ b/ee/app/models/search/zoekt/repository.rb @@ -53,10 +53,6 @@ class Repository < ApplicationRecord scope :searchable, -> { where(state: SEARCHABLE_STATES) } - def self.find_or_create_by_project(identifier, project, zoekt_index:) - find_or_create_by!(project_identifier: identifier, project: project, zoekt_index: zoekt_index) - end - def self.create_bulk_tasks(task_type: :index_repo, perform_at: Time.zone.now) scope = self unless task_type.to_sym == :delete_repo diff --git a/ee/app/services/search/zoekt/indexing_task_service.rb b/ee/app/services/search/zoekt/indexing_task_service.rb index 6f138fee703795..a8796b1b0a3034 100644 --- a/ee/app/services/search/zoekt/indexing_task_service.rb +++ b/ee/app/services/search/zoekt/indexing_task_service.rb @@ -54,7 +54,7 @@ def execute perform_at = Time.current perform_at += delay if delay - zoekt_repo = idx.zoekt_repositories.find_or_create_by_project(project_id, project, zoekt_index: idx) + zoekt_repo = idx.find_or_create_repository_by_project!(project_id, project) Repository.id_in(zoekt_repo).create_bulk_tasks(task_type: current_task_type, perform_at: perform_at) end end diff --git a/ee/spec/models/search/zoekt/index_spec.rb b/ee/spec/models/search/zoekt/index_spec.rb index 1a409b5fd65961..c2d2732405a001 100644 --- a/ee/spec/models/search/zoekt/index_spec.rb +++ b/ee/spec/models/search/zoekt/index_spec.rb @@ -540,4 +540,45 @@ expect(zoekt_index).not_to be_should_be_deleted end end + + describe '#find_or_create_repository_by_project!' do + let_it_be(:zoekt_index) { create(:zoekt_index) } + let_it_be(:project) { create(:project) } + + context 'when find_or_create_by! raises an error' do + before do + allow(zoekt_index).to receive_message_chain(:zoekt_repositories, :find_or_create_by!).and_raise(StandardError) + end + + it 'raises the error' do + expect do + zoekt_index.find_or_create_repository_by_project!(project.id, project) + end.to raise_error(StandardError).and not_change { Search::Zoekt::Repository.count } + end + end + + context 'when zoekt_repository exists with the given params' do + before do + create(:zoekt_repository, project: project, zoekt_index: zoekt_index) + end + + it 'returns the existing record' do + result = nil + expect do + result = zoekt_index.find_or_create_repository_by_project!(project.id, project) + end.not_to change { zoekt_index.zoekt_repositories.count } + expect(result.project_identifier).to eq project.id + end + end + + context 'when zoekt_repository does not exists with the given params' do + it 'creates and return the new record' do + result = nil + expect do + result = zoekt_index.find_or_create_repository_by_project!(project.id, project) + end.to change { zoekt_index.zoekt_repositories.count }.by(1) + expect(result.project_identifier).to eq project.id + end + end + end end diff --git a/ee/spec/models/search/zoekt/repository_spec.rb b/ee/spec/models/search/zoekt/repository_spec.rb index 5681da3410d480..7e3dc28409ee53 100644 --- a/ee/spec/models/search/zoekt/repository_spec.rb +++ b/ee/spec/models/search/zoekt/repository_spec.rb @@ -87,31 +87,6 @@ end end - describe '.find_or_create_by_project' do - let_it_be(:zoekt_index) { create(:zoekt_index) } - let_it_be(:project) { create(:project) } - - context 'when zoekt_repository exists with the given params' do - let_it_be(:zoekt_repository) { create(:zoekt_repository, project: project, zoekt_index: zoekt_index) } - - it 'returns the existing record' do - result = nil - expect { result = described_class.find_or_create_by_project(project.id, project, zoekt_index: zoekt_index) } - .not_to change { described_class.count } - expect(result.project_identifier).to eq project.id - end - end - - context 'when zoekt_repository does not exists with the given params' do - it 'creates and return the new record' do - result = nil - expect { result = described_class.find_or_create_by_project(project.id, project, zoekt_index: zoekt_index) } - .to change { described_class.count }.by(1) - expect(result.project_identifier).to eq project.id - end - end - end - describe '.create_bulk_tasks', :freeze_time do let_it_be(:zoekt_repo_with_pending_tasks) { create(:zoekt_repository) } let_it_be(:zoekt_repo_with_processing_tasks) { create(:zoekt_repository) } -- GitLab From 255b3ca916d7b363fd7f80effa4a288bdc5429b8 Mon Sep 17 00:00:00 2001 From: rkumar555 Date: Wed, 5 Feb 2025 15:08:56 +0100 Subject: [PATCH 3/4] Reduce the batch size of RepoToIndexEventWorker --- ee/app/models/search/zoekt/repository.rb | 9 +++++---- .../workers/search/zoekt/repo_to_index_event_worker.rb | 2 +- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/ee/app/models/search/zoekt/repository.rb b/ee/app/models/search/zoekt/repository.rb index ed7c7ee02d14ef..90e89a3d1275ce 100644 --- a/ee/app/models/search/zoekt/repository.rb +++ b/ee/app/models/search/zoekt/repository.rb @@ -76,10 +76,11 @@ def self.create_bulk_tasks(task_type: :index_repo, perform_at: Time.zone.now) updated_at: Time.zone.now ) end - ApplicationRecord.transaction do - scope.pending.update_all(state: :initializing) - Search::Zoekt::Task.bulk_insert!(tasks) - end + task_ids = Task.bulk_insert!(tasks, returns: :ids) + return if task_ids.empty? + + repo_ids = Task.id_in(task_ids).select(:zoekt_repository_id) + Repository.id_in(repo_ids).pending.update_all(state: :initializing) end private diff --git a/ee/app/workers/search/zoekt/repo_to_index_event_worker.rb b/ee/app/workers/search/zoekt/repo_to_index_event_worker.rb index bf90ef7c03f5e1..b0189e61e90643 100644 --- a/ee/app/workers/search/zoekt/repo_to_index_event_worker.rb +++ b/ee/app/workers/search/zoekt/repo_to_index_event_worker.rb @@ -11,7 +11,7 @@ class RepoToIndexEventWorker defer_on_database_health_signal :gitlab_main, [:zoekt_repositories, :zoekt_tasks], 10.minutes - BATCH_SIZE = 1000 + BATCH_SIZE = 500 def handle_event(_event) return false unless ::Search::Zoekt.licensed_and_indexing_enabled? -- GitLab From 55ecbf7f8c92b1339c84fdb4ec59cfc56721aa46 Mon Sep 17 00:00:00 2001 From: rkumar555 Date: Thu, 6 Feb 2025 12:25:49 +0100 Subject: [PATCH 4/4] Add batching in repos update --- ee/app/models/search/zoekt/repository.rb | 8 +++----- .../zoekt/repo_marked_as_to_delete_event_worker.rb | 2 +- ee/spec/models/search/zoekt/repository_spec.rb | 13 +++++++++++++ .../repo_marked_as_to_delete_event_worker_spec.rb | 5 ----- 4 files changed, 17 insertions(+), 11 deletions(-) diff --git a/ee/app/models/search/zoekt/repository.rb b/ee/app/models/search/zoekt/repository.rb index 90e89a3d1275ce..9f4d43c91165ef 100644 --- a/ee/app/models/search/zoekt/repository.rb +++ b/ee/app/models/search/zoekt/repository.rb @@ -76,11 +76,9 @@ def self.create_bulk_tasks(task_type: :index_repo, perform_at: Time.zone.now) updated_at: Time.zone.now ) end - task_ids = Task.bulk_insert!(tasks, returns: :ids) - return if task_ids.empty? - - repo_ids = Task.id_in(task_ids).select(:zoekt_repository_id) - Repository.id_in(repo_ids).pending.update_all(state: :initializing) + Task.bulk_insert!(tasks) + repo_ids = tasks.map(&:zoekt_repository_id) + Repository.id_in(repo_ids).pending.each_batch { |repos| repos.update_all(state: :initializing) } end private diff --git a/ee/app/workers/search/zoekt/repo_marked_as_to_delete_event_worker.rb b/ee/app/workers/search/zoekt/repo_marked_as_to_delete_event_worker.rb index 7d7ff48d9f9a14..429d6ddbb1c807 100644 --- a/ee/app/workers/search/zoekt/repo_marked_as_to_delete_event_worker.rb +++ b/ee/app/workers/search/zoekt/repo_marked_as_to_delete_event_worker.rb @@ -12,7 +12,7 @@ class RepoMarkedAsToDeleteEventWorker defer_on_database_health_signal :gitlab_main, [:zoekt_repositories, :zoekt_tasks], 10.minutes - BATCH_SIZE = 1000 + BATCH_SIZE = 500 def handle_event(_event) Repository.should_be_deleted.limit(BATCH_SIZE).create_bulk_tasks(task_type: :delete_repo) diff --git a/ee/spec/models/search/zoekt/repository_spec.rb b/ee/spec/models/search/zoekt/repository_spec.rb index 7e3dc28409ee53..6e4884fb96382c 100644 --- a/ee/spec/models/search/zoekt/repository_spec.rb +++ b/ee/spec/models/search/zoekt/repository_spec.rb @@ -117,6 +117,19 @@ end context 'when task_type is index_repo' do + context 'when bulk_insert! raises an exception' do + before do + allow(Search::Zoekt::Task).to receive(:bulk_insert!).and_raise(ActiveRecord::StatementInvalid) + end + + it 'raises the exception' do + initial_zoekt_repo_states = described_class.all.pluck(:state, :id) + expect { described_class.create_bulk_tasks }.to raise_error(ActiveRecord::StatementInvalid) + .and not_change { Search::Zoekt::Task.count } + expect(described_class.all.pluck(:state, :id)).to match_array(initial_zoekt_repo_states) + end + end + it 'does not creates tasks for failed repos and creates tasks for repos which do not have pending tasks' do pending_tasks_count = zoekt_repo_with_pending_tasks.reload.tasks.count processing_tasks_count = zoekt_repo_with_processing_tasks.reload.tasks.count diff --git a/ee/spec/workers/search/zoekt/repo_marked_as_to_delete_event_worker_spec.rb b/ee/spec/workers/search/zoekt/repo_marked_as_to_delete_event_worker_spec.rb index a69daabb85c0ad..996c6b4c503e48 100644 --- a/ee/spec/workers/search/zoekt/repo_marked_as_to_delete_event_worker_spec.rb +++ b/ee/spec/workers/search/zoekt/repo_marked_as_to_delete_event_worker_spec.rb @@ -16,9 +16,6 @@ it_behaves_like 'an idempotent worker' do it 'creates a delete repo task for all repos in the list' do - expect(Search::Zoekt::Repository).to receive(:should_be_deleted).and_return(scope) - expect(scope).to receive(:limit).with(described_class::BATCH_SIZE).and_call_original - expect do consume_event(subscriber: described_class, event: event) end.to change { Search::Zoekt::Task.where(task_type: :delete_repo).size }.from(0).to(repos.size) @@ -26,8 +23,6 @@ it 'processes in batches' do stub_const("#{described_class}::BATCH_SIZE", 2) - expect(Search::Zoekt::Repository).to receive(:should_be_deleted).and_return(scope) - expect(scope).to receive(:limit).with(described_class::BATCH_SIZE).and_call_original expect do consume_event(subscriber: described_class, event: event) end.to change { Search::Zoekt::Task.where(task_type: :delete_repo).size }.from(0).to(described_class::BATCH_SIZE) -- GitLab