diff --git a/.rubocop_todo/rspec/change_by_zero.yml b/.rubocop_todo/rspec/change_by_zero.yml index 8b3c68c73ec4a6f8829fa53b500d98598064b21e..de4be7b6b016f5dcae14c8e2701944b00bd2aa4b 100644 --- a/.rubocop_todo/rspec/change_by_zero.yml +++ b/.rubocop_todo/rspec/change_by_zero.yml @@ -6,7 +6,6 @@ RSpec/ChangeByZero: - 'ee/spec/lib/ee/gitlab/scim/group/deprovisioning_service_spec.rb' - 'ee/spec/lib/merge_requests/external_status_check_changes_auditor_spec.rb' - 'ee/spec/models/ee/project_member_spec.rb' - - 'ee/spec/models/search/zoekt/repository_spec.rb' - 'ee/spec/requests/api/admin/search/zoekt_spec.rb' - 'ee/spec/requests/api/graphql/audit_events/streaming/instance_headers/destroy_spec.rb' - 'ee/spec/requests/api/graphql/mutations/audit_events/instance_external_audit_event_destinations/destroy_spec.rb' diff --git a/ee/app/models/search/zoekt/index.rb b/ee/app/models/search/zoekt/index.rb index a9de8aeadd205e6012737e1b773456003c9b285f..df71a1de95386954d3878f5cc5e04b63ae34ba4d 100644 --- a/ee/app/models/search/zoekt/index.rb +++ b/ee/app/models/search/zoekt/index.rb @@ -148,6 +148,10 @@ def should_be_deleted? SHOULD_BE_DELETED_STATES.include? state.to_sym end + def find_or_create_repository_by_project!(identifier, project) + zoekt_repositories.find_or_create_by!(project_identifier: identifier, project: project) + end + private def storage_percent_used diff --git a/ee/app/models/search/zoekt/repository.rb b/ee/app/models/search/zoekt/repository.rb index e81fc7ee140960e3e516b7ea434815e21494d3a6..9f4d43c91165ef04c30965a67a923843e25917de 100644 --- a/ee/app/models/search/zoekt/repository.rb +++ b/ee/app/models/search/zoekt/repository.rb @@ -53,18 +53,6 @@ class Repository < ApplicationRecord scope :searchable, -> { where(state: SEARCHABLE_STATES) } - def self.create_tasks(project_id:, zoekt_index:, task_type:, perform_at:) - project = Project.find_by_id(project_id) - find_or_initialize_by(project_identifier: project_id, project: project, zoekt_index: zoekt_index).tap do |item| - item.save! if item.new_record? - break if item.tasks.pending.exists?(zoekt_node_id: zoekt_index.zoekt_node_id, task_type: task_type) - break if item.failed? && task_type != :delete_repo - - item.initializing! if item.pending? - item.tasks.create!(zoekt_node_id: zoekt_index.zoekt_node_id, task_type: task_type, perform_at: perform_at) - end - end - def self.create_bulk_tasks(task_type: :index_repo, perform_at: Time.zone.now) scope = self unless task_type.to_sym == :delete_repo @@ -88,10 +76,9 @@ def self.create_bulk_tasks(task_type: :index_repo, perform_at: Time.zone.now) updated_at: Time.zone.now ) end - ApplicationRecord.transaction do - scope.pending.update_all(state: :initializing) - Search::Zoekt::Task.bulk_insert!(tasks) - end + Task.bulk_insert!(tasks) + repo_ids = tasks.map(&:zoekt_repository_id) + Repository.id_in(repo_ids).pending.each_batch { |repos| repos.update_all(state: :initializing) } end private diff --git a/ee/app/services/search/zoekt/indexing_task_service.rb b/ee/app/services/search/zoekt/indexing_task_service.rb index 6cc7f90eca68112cb016cda7c672a99509383070..a8796b1b0a30342f12c5747314a64a62178933c4 100644 --- a/ee/app/services/search/zoekt/indexing_task_service.rb +++ b/ee/app/services/search/zoekt/indexing_task_service.rb @@ -54,11 +54,8 @@ def execute perform_at = Time.current perform_at += delay if delay - ApplicationRecord.transaction do - Repository.create_tasks( - project_id: project_id, zoekt_index: idx, task_type: current_task_type, perform_at: perform_at - ) - end + zoekt_repo = idx.find_or_create_repository_by_project!(project_id, project) + Repository.id_in(zoekt_repo).create_bulk_tasks(task_type: current_task_type, perform_at: perform_at) end end diff --git a/ee/app/workers/search/zoekt/repo_marked_as_to_delete_event_worker.rb b/ee/app/workers/search/zoekt/repo_marked_as_to_delete_event_worker.rb index 100e08be81c966ba4ff6f917e83181feff7b813a..429d6ddbb1c80710a773bbffdc20eb156cdb75f9 100644 --- a/ee/app/workers/search/zoekt/repo_marked_as_to_delete_event_worker.rb +++ b/ee/app/workers/search/zoekt/repo_marked_as_to_delete_event_worker.rb @@ -12,20 +12,10 @@ class RepoMarkedAsToDeleteEventWorker defer_on_database_health_signal :gitlab_main, [:zoekt_repositories, :zoekt_tasks], 10.minutes - BATCH_SIZE = 1000 + BATCH_SIZE = 500 def handle_event(_event) - repos = Repository.should_be_deleted.limit(BATCH_SIZE) - - repos.each do |repo| - # Note: this has checks in place that prevent creating duplicate tasks - Repository.create_tasks( - project_id: repo.project_identifier, - zoekt_index: repo.zoekt_index, - task_type: :delete_repo, - perform_at: Time.current - ) - end + Repository.should_be_deleted.limit(BATCH_SIZE).create_bulk_tasks(task_type: :delete_repo) end end end diff --git a/ee/app/workers/search/zoekt/repo_to_index_event_worker.rb b/ee/app/workers/search/zoekt/repo_to_index_event_worker.rb index bf90ef7c03f5e1905a5eebe1b2e28ac47c70f235..b0189e61e906431f3b655e4a51f750dd1b5c22dd 100644 --- a/ee/app/workers/search/zoekt/repo_to_index_event_worker.rb +++ b/ee/app/workers/search/zoekt/repo_to_index_event_worker.rb @@ -11,7 +11,7 @@ class RepoToIndexEventWorker defer_on_database_health_signal :gitlab_main, [:zoekt_repositories, :zoekt_tasks], 10.minutes - BATCH_SIZE = 1000 + BATCH_SIZE = 500 def handle_event(_event) return false unless ::Search::Zoekt.licensed_and_indexing_enabled? diff --git a/ee/spec/models/search/zoekt/index_spec.rb b/ee/spec/models/search/zoekt/index_spec.rb index 1a409b5fd65961183753868d9d5b6b381096f616..c2d2732405a0012995a44632f54cb995df185c9c 100644 --- a/ee/spec/models/search/zoekt/index_spec.rb +++ b/ee/spec/models/search/zoekt/index_spec.rb @@ -540,4 +540,45 @@ expect(zoekt_index).not_to be_should_be_deleted end end + + describe '#find_or_create_repository_by_project!' do + let_it_be(:zoekt_index) { create(:zoekt_index) } + let_it_be(:project) { create(:project) } + + context 'when find_or_create_by! raises an error' do + before do + allow(zoekt_index).to receive_message_chain(:zoekt_repositories, :find_or_create_by!).and_raise(StandardError) + end + + it 'raises the error' do + expect do + zoekt_index.find_or_create_repository_by_project!(project.id, project) + end.to raise_error(StandardError).and not_change { Search::Zoekt::Repository.count } + end + end + + context 'when zoekt_repository exists with the given params' do + before do + create(:zoekt_repository, project: project, zoekt_index: zoekt_index) + end + + it 'returns the existing record' do + result = nil + expect do + result = zoekt_index.find_or_create_repository_by_project!(project.id, project) + end.not_to change { zoekt_index.zoekt_repositories.count } + expect(result.project_identifier).to eq project.id + end + end + + context 'when zoekt_repository does not exists with the given params' do + it 'creates and return the new record' do + result = nil + expect do + result = zoekt_index.find_or_create_repository_by_project!(project.id, project) + end.to change { zoekt_index.zoekt_repositories.count }.by(1) + expect(result.project_identifier).to eq project.id + end + end + end end diff --git a/ee/spec/models/search/zoekt/repository_spec.rb b/ee/spec/models/search/zoekt/repository_spec.rb index 2553b18751c6698d40e869732805784c3586829b..6e4884fb96382c7e7afc24315979adb8038ffc40 100644 --- a/ee/spec/models/search/zoekt/repository_spec.rb +++ b/ee/spec/models/search/zoekt/repository_spec.rb @@ -87,125 +87,6 @@ end end - describe '.create_tasks', :freeze_time do - let(:task_type) { :index_repo } - - context 'when repository does not exists for a project and zoekt_index' do - let_it_be(:project) { create(:project) } - let_it_be(:index) { create(:zoekt_index) } - - it 'creates a new initializing repository and task' do - perform_at = Time.zone.now - expect do - described_class.create_tasks(project_id: project.id, zoekt_index: index, task_type: task_type, - perform_at: perform_at - ) - end.to change { described_class.count }.by(1).and change { Search::Zoekt::Task.count }.by(1) - repo = described_class.last - expect(repo).to be_initializing - expect(repo.project).to eq project - expect(repo.zoekt_index).to eq index - task = Search::Zoekt::Task.last - expect(task.zoekt_repository).to eq repo - expect(task.project_identifier).to eq repo.project_identifier - expect(task).to be_index_repo - expect(task.perform_at).to eq perform_at - end - end - - context 'when repository already exists for a project and zoekt_index' do - let_it_be(:repo) { create(:zoekt_repository) } - let_it_be(:zoekt_index) { repo.zoekt_index } - - it 'creates task' do - perform_at = Time.zone.now - expect do - described_class.create_tasks(project_id: repo.project_identifier, zoekt_index: zoekt_index, - task_type: task_type, perform_at: perform_at - ) - end.to change { described_class.count }.by(0).and change { Search::Zoekt::Task.count }.by(1) - task = Search::Zoekt::Task.last - expect(task.zoekt_repository).to eq repo - expect(task.project_identifier).to eq repo.project_identifier - expect(task).to be_index_repo - expect(task.perform_at).to eq perform_at - end - - context 'when project is already deleted' do - let_it_be(:repo_with_deleted_project) { create(:zoekt_repository, zoekt_index: zoekt_index) } - let_it_be(:repo_with_deleted_project2) { create(:zoekt_repository, zoekt_index: zoekt_index) } - - before do - [repo_with_deleted_project.project, repo_with_deleted_project2.project].map(&:destroy!) - end - - it 'creates task with the supplied project_id' do - perform_at = Time.zone.now - expect do - described_class.create_tasks(project_id: repo_with_deleted_project2.project_identifier, - zoekt_index: zoekt_index, task_type: :delete_repo, perform_at: perform_at - ) - end.to change { described_class.count }.by(0).and change { Search::Zoekt::Task.count }.by(1) - task = Search::Zoekt::Task.last - expect(task.zoekt_repository).to eq repo_with_deleted_project2 - expect(task.project_identifier).to eq repo_with_deleted_project2.project_identifier - expect(task).to be_delete_repo - expect(task.perform_at).to eq perform_at - end - end - - context 'when there is already a pending task with the provided zoekt_node_id and task_type' do - before do - create(:zoekt_task, :pending, zoekt_repository: repo, task_type: task_type, node: zoekt_index.node) - end - - it 'does not create the new task' do - expect do - described_class.create_tasks(project_id: repo.project_identifier, zoekt_index: zoekt_index, - task_type: task_type, perform_at: Time.zone.now - ) - end.not_to change { Search::Zoekt::Task.count } - end - end - end - - context 'when repository is in failed state' do - let_it_be(:repo) { create(:zoekt_repository, state: :failed) } - let_it_be(:zoekt_index) { repo.zoekt_index } - - context 'and task_type is index_repo' do - let(:task_type) { :index_repo } - - it 'does not creates task' do - perform_at = Time.zone.now - expect do - described_class.create_tasks(project_id: repo.project_identifier, zoekt_index: zoekt_index, - task_type: task_type, perform_at: perform_at - ) - end.not_to change { described_class.count } - end - end - - context 'and task_type is delete_repo' do - let(:task_type) { :delete_repo } - - it 'creates task' do - perform_at = Time.zone.now - expect do - described_class.create_tasks(project_id: repo.project_identifier, zoekt_index: zoekt_index, - task_type: task_type, perform_at: perform_at - ) - end.to change { described_class.count }.by(0).and change { Search::Zoekt::Task.count }.by(1) - task = Search::Zoekt::Task.last - expect(task.zoekt_repository).to eq repo - expect(task.project_identifier).to eq repo.project_identifier - expect(task).to be_delete_repo - expect(task.perform_at).to eq perform_at - end - end - end - end - describe '.create_bulk_tasks', :freeze_time do let_it_be(:zoekt_repo_with_pending_tasks) { create(:zoekt_repository) } let_it_be(:zoekt_repo_with_processing_tasks) { create(:zoekt_repository) } @@ -236,6 +117,19 @@ end context 'when task_type is index_repo' do + context 'when bulk_insert! raises an exception' do + before do + allow(Search::Zoekt::Task).to receive(:bulk_insert!).and_raise(ActiveRecord::StatementInvalid) + end + + it 'raises the exception' do + initial_zoekt_repo_states = described_class.all.pluck(:state, :id) + expect { described_class.create_bulk_tasks }.to raise_error(ActiveRecord::StatementInvalid) + .and not_change { Search::Zoekt::Task.count } + expect(described_class.all.pluck(:state, :id)).to match_array(initial_zoekt_repo_states) + end + end + it 'does not creates tasks for failed repos and creates tasks for repos which do not have pending tasks' do pending_tasks_count = zoekt_repo_with_pending_tasks.reload.tasks.count processing_tasks_count = zoekt_repo_with_processing_tasks.reload.tasks.count diff --git a/ee/spec/workers/search/zoekt/repo_marked_as_to_delete_event_worker_spec.rb b/ee/spec/workers/search/zoekt/repo_marked_as_to_delete_event_worker_spec.rb index 3bb96c95b769031a63485e97c782f0646ff90791..996c6b4c503e48999f75e66183380537bd72deae 100644 --- a/ee/spec/workers/search/zoekt/repo_marked_as_to_delete_event_worker_spec.rb +++ b/ee/spec/workers/search/zoekt/repo_marked_as_to_delete_event_worker_spec.rb @@ -16,9 +16,6 @@ it_behaves_like 'an idempotent worker' do it 'creates a delete repo task for all repos in the list' do - expect(Search::Zoekt::Repository).to receive(:should_be_deleted).and_return(scope) - expect(scope).to receive(:limit).with(described_class::BATCH_SIZE).and_return(repos) - expect do consume_event(subscriber: described_class, event: event) end.to change { Search::Zoekt::Task.where(task_type: :delete_repo).size }.from(0).to(repos.size) @@ -26,8 +23,6 @@ it 'processes in batches' do stub_const("#{described_class}::BATCH_SIZE", 2) - expect(Search::Zoekt::Repository).to receive(:should_be_deleted).and_return(scope) - expect(scope).to receive(:limit).with(described_class::BATCH_SIZE).and_call_original expect do consume_event(subscriber: described_class, event: event) end.to change { Search::Zoekt::Task.where(task_type: :delete_repo).size }.from(0).to(described_class::BATCH_SIZE)