From 163ad707aef37bca8527c373b0444f6801a48405 Mon Sep 17 00:00:00 2001 From: Arturo Herrero Date: Fri, 11 Sep 2020 20:00:47 +0100 Subject: [PATCH 1/9] Propagate integrations using batching and queues MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This is an important change in the architecture to propagate integrations. We can now propagate instance-level integrations and templates using batching and Sidekiq queues. The problem before is the performance of the worst-case scenario, where if there are no matching records and the anti-join. With the new approach, each job in the new queues handles a batch of projects/groups; rather than having a single job for all of them. This is what we do right now in the complex case of propagating an instance-level integration: - Update inherited integrations, - Create integration for all projects without integration. - Create integration for all groups without integration. BEFORE: Save integration ↓ ┌┬───────────────────────┬┐ │| Propagate integration |│ └┴───────────────────────┴┘ ↓ Update inherited integrations Create integration for all projects without integration Create integration for all groups without integration AFTER: Save integration ↓ ┌┬───────────────────────┬┐ │| Propagate integration |│ └┴───────────────────────┴┘ ↓ ↓ ┌┬─────────────────────┬┐ ┌┬───────────────────────┬┐ │| Propagate to groups |│ │| Propagate to projects |│ └┴─────────────────────┴┘ └┴───────────────────────┴┘ Update inherited integrations --- app/models/concerns/integration.rb | 4 +- app/models/group.rb | 10 ++ app/models/project.rb | 1 + .../admin/propagate_integration_service.rb | 31 ++--- .../admin/propagate_service_template.rb | 6 - .../bulk_create_integration_service.rb | 59 +++++++++ .../concerns/admin/propagate_service.rb | 49 +------ app/workers/all_queues.yml | 16 +++ .../propagate_integration_group_worker.rb | 18 +++ .../propagate_integration_project_worker.rb | 18 +++ config/sidekiq_queues.yml | 4 + spec/models/group_spec.rb | 14 ++ spec/models/integration_spec.rb | 10 +- .../propagate_integration_service_spec.rb | 77 +++++------ .../admin/propagate_service_template_spec.rb | 125 +++--------------- .../bulk_create_integration_service_spec.rb | 107 +++++++++++++++ ...propagate_integration_group_worker_spec.rb | 19 +++ ...opagate_integration_project_worker_spec.rb | 19 +++ 18 files changed, 357 insertions(+), 230 deletions(-) create mode 100644 app/services/bulk_create_integration_service.rb create mode 100644 app/workers/propagate_integration_group_worker.rb create mode 100644 app/workers/propagate_integration_project_worker.rb create mode 100644 spec/services/bulk_create_integration_service_spec.rb create mode 100644 spec/workers/propagate_integration_group_worker_spec.rb create mode 100644 spec/workers/propagate_integration_project_worker_spec.rb diff --git a/app/models/concerns/integration.rb b/app/models/concerns/integration.rb index 34ff5bb1195ca1..9d446841a9fcee 100644 --- a/app/models/concerns/integration.rb +++ b/app/models/concerns/integration.rb @@ -16,7 +16,7 @@ def with_custom_integration_for(integration, page = nil, per = nil) Project.where(id: custom_integration_project_ids) end - def ids_without_integration(integration, limit) + def without_integration(integration) services = Service .select('1') .where('services.project_id = projects.id') @@ -26,8 +26,6 @@ def ids_without_integration(integration, limit) .where('NOT EXISTS (?)', services) .where(pending_delete: false) .where(archived: false) - .limit(limit) - .pluck(:id) end end end diff --git a/app/models/group.rb b/app/models/group.rb index c0f145997ccf87..abb3f6c96c4dcc 100644 --- a/app/models/group.rb +++ b/app/models/group.rb @@ -15,6 +15,7 @@ class Group < Namespace include WithUploads include Gitlab::Utils::StrongMemoize include GroupAPICompatibility + include EachBatch ACCESS_REQUEST_APPROVERS_TO_BE_NOTIFIED_LIMIT = 10 @@ -140,6 +141,15 @@ def select_for_project_authorization end end + def without_integration(integration) + services = Service + .select('1') + .where('services.group_id = namespaces.id') + .where(type: integration.type) + + where('NOT EXISTS (?)', services) + end + private def public_to_user_arel(user) diff --git a/app/models/project.rb b/app/models/project.rb index dacad903439fe4..8934c09f7b166f 100644 --- a/app/models/project.rb +++ b/app/models/project.rb @@ -33,6 +33,7 @@ class Project < ApplicationRecord include FromUnion include IgnorableColumns include Integration + include EachBatch extend Gitlab::Cache::RequestCache extend Gitlab::ConfigHelper diff --git a/app/services/admin/propagate_integration_service.rb b/app/services/admin/propagate_integration_service.rb index 34d6008cb6a6a8..8ed5b4ee303ac5 100644 --- a/app/services/admin/propagate_integration_service.rb +++ b/app/services/admin/propagate_integration_service.rb @@ -5,10 +5,9 @@ class PropagateIntegrationService include PropagateService def propagate - update_inherited_integrations - create_integration_for_groups_without_integration if Feature.enabled?(:group_level_integrations) create_integration_for_projects_without_integration + update_inherited_integrations end private @@ -33,7 +32,7 @@ def bulk_update_from_integration(batch) Service.transaction do batch.update_all(service_hash) - if data_fields_present? + if integration.data_fields_present? integration.data_fields.class.where(service_id: batch_ids).update_all(data_fields_hash) end end @@ -41,32 +40,18 @@ def bulk_update_from_integration(batch) # rubocop: enable CodeReuse/ActiveRecord def create_integration_for_groups_without_integration - loop do - batch = Group.uncached { group_ids_without_integration(integration, BATCH_SIZE) } - - bulk_create_from_integration(batch, 'group') unless batch.empty? - - break if batch.size < BATCH_SIZE + Group.without_integration(integration).each_batch(of: BATCH_SIZE) do |groups| + min_id, max_id = groups.pick("MIN(namespaces.id), MAX(namespaces.id)") + PropagateIntegrationGroupWorker.perform_async(integration.id, min_id, max_id) end end def service_hash - @service_hash ||= integration.to_service_hash - .tap { |json| json['inherit_from_id'] = integration.id } + integration.to_service_hash.tap { |json| json['inherit_from_id'] = integration.id } end - # rubocop:disable CodeReuse/ActiveRecord - def group_ids_without_integration(integration, limit) - services = Service - .select('1') - .where('services.group_id = namespaces.id') - .where(type: integration.type) - - Group - .where('NOT EXISTS (?)', services) - .limit(limit) - .pluck(:id) + def data_fields_hash + integration.to_data_fields_hash end - # rubocop:enable CodeReuse/ActiveRecord end end diff --git a/app/services/admin/propagate_service_template.rb b/app/services/admin/propagate_service_template.rb index cd0d2d5d03fa5e..07be3c1027d0fb 100644 --- a/app/services/admin/propagate_service_template.rb +++ b/app/services/admin/propagate_service_template.rb @@ -9,11 +9,5 @@ def propagate create_integration_for_projects_without_integration end - - private - - def service_hash - @service_hash ||= integration.to_service_hash - end end end diff --git a/app/services/bulk_create_integration_service.rb b/app/services/bulk_create_integration_service.rb new file mode 100644 index 00000000000000..b3b3da367a7a60 --- /dev/null +++ b/app/services/bulk_create_integration_service.rb @@ -0,0 +1,59 @@ +# frozen_string_literal: true + +class BulkCreateIntegrationService + def initialize(integration, batch_ids, association) + @integration = integration + @batch_ids = batch_ids + @association = association + end + + def execute + service_list = ServiceList.new(batch_ids, service_hash, association).to_array + + Service.transaction do + results = bulk_insert(*service_list) + + if integration.data_fields_present? + data_list = DataList.new(results, data_fields_hash, integration.data_fields.class).to_array + + bulk_insert(*data_list) + end + + run_callbacks(batch_ids) if association == 'project' + end + end + + private + + attr_reader :integration, :batch_ids, :association + + def bulk_insert(klass, columns, values_array) + items_to_insert = values_array.map { |array| Hash[columns.zip(array)] } + + klass.insert_all(items_to_insert, returning: [:id]) + end + + # rubocop: disable CodeReuse/ActiveRecord + def run_callbacks(batch_ids) + if integration.issue_tracker? + Project.where(id: batch_ids).update_all(has_external_issue_tracker: true) + end + + if integration.type == 'ExternalWikiService' + Project.where(id: batch_ids).update_all(has_external_wiki: true) + end + end + # rubocop: enable CodeReuse/ActiveRecord + + def service_hash + if integration.template? + integration.to_service_hash + else + integration.to_service_hash.tap { |json| json['inherit_from_id'] = integration.id } + end + end + + def data_fields_hash + integration.to_data_fields_hash + end +end diff --git a/app/services/concerns/admin/propagate_service.rb b/app/services/concerns/admin/propagate_service.rb index 974408f678c1b2..fb53dd9727b841 100644 --- a/app/services/concerns/admin/propagate_service.rb +++ b/app/services/concerns/admin/propagate_service.rb @@ -6,8 +6,6 @@ module PropagateService BATCH_SIZE = 100 - delegate :data_fields_present?, to: :integration - class_methods do def propagate(integration) new(integration).propagate @@ -23,51 +21,10 @@ def initialize(integration) attr_reader :integration def create_integration_for_projects_without_integration - loop do - batch_ids = Project.uncached { Project.ids_without_integration(integration, BATCH_SIZE) } - - bulk_create_from_integration(batch_ids, 'project') unless batch_ids.empty? - - break if batch_ids.size < BATCH_SIZE - end - end - - def bulk_create_from_integration(batch_ids, association) - service_list = ServiceList.new(batch_ids, service_hash, association).to_array - - Service.transaction do - results = bulk_insert(*service_list) - - if data_fields_present? - data_list = DataList.new(results, data_fields_hash, integration.data_fields.class).to_array - - bulk_insert(*data_list) - end - - run_callbacks(batch_ids) if association == 'project' + Project.without_integration(integration).each_batch(of: BATCH_SIZE) do |projects| + min_id, max_id = projects.pick("MIN(projects.id), MAX(projects.id)") + PropagateIntegrationProjectWorker.perform_async(integration.id, min_id, max_id) end end - - def bulk_insert(klass, columns, values_array) - items_to_insert = values_array.map { |array| Hash[columns.zip(array)] } - - klass.insert_all(items_to_insert, returning: [:id]) - end - - # rubocop: disable CodeReuse/ActiveRecord - def run_callbacks(batch_ids) - if integration.issue_tracker? - Project.where(id: batch_ids).update_all(has_external_issue_tracker: true) - end - - if integration.type == 'ExternalWikiService' - Project.where(id: batch_ids).update_all(has_external_wiki: true) - end - end - # rubocop: enable CodeReuse/ActiveRecord - - def data_fields_hash - @data_fields_hash ||= integration.to_data_fields_hash - end end end diff --git a/app/workers/all_queues.yml b/app/workers/all_queues.yml index d7d9762b45ceaf..519c999088db2d 100644 --- a/app/workers/all_queues.yml +++ b/app/workers/all_queues.yml @@ -1716,6 +1716,22 @@ :weight: 1 :idempotent: true :tags: [] +- :name: propagate_integration_group + :feature_category: :integrations + :has_external_dependencies: + :urgency: :low + :resource_boundary: :unknown + :weight: 1 + :idempotent: true + :tags: [] +- :name: propagate_integration_project + :feature_category: :integrations + :has_external_dependencies: + :urgency: :low + :resource_boundary: :unknown + :weight: 1 + :idempotent: true + :tags: [] - :name: propagate_service_template :feature_category: :integrations :has_external_dependencies: diff --git a/app/workers/propagate_integration_group_worker.rb b/app/workers/propagate_integration_group_worker.rb new file mode 100644 index 00000000000000..c9c51f680541d0 --- /dev/null +++ b/app/workers/propagate_integration_group_worker.rb @@ -0,0 +1,18 @@ +# frozen_string_literal: true + +class PropagateIntegrationGroupWorker + include ApplicationWorker + + feature_category :integrations + idempotent! + loggable_arguments 1 + + # rubocop: disable CodeReuse/ActiveRecord + def perform(integration_id, min_id, max_id) + integration = Service.find(integration_id) + batch_ids = Group.where(id: min_id..max_id).without_integration(integration).pluck(:id) + + BulkCreateIntegrationService.new(integration, batch_ids, 'group').execute + end + # rubocop: enable CodeReuse/ActiveRecord +end diff --git a/app/workers/propagate_integration_project_worker.rb b/app/workers/propagate_integration_project_worker.rb new file mode 100644 index 00000000000000..c568daae30876f --- /dev/null +++ b/app/workers/propagate_integration_project_worker.rb @@ -0,0 +1,18 @@ +# frozen_string_literal: true + +class PropagateIntegrationProjectWorker + include ApplicationWorker + + feature_category :integrations + idempotent! + loggable_arguments 1 + + # rubocop: disable CodeReuse/ActiveRecord + def perform(integration_id, min_id, max_id) + integration = Service.find(integration_id) + batch_ids = Project.where(id: min_id..max_id).without_integration(integration).pluck(:id) + + BulkCreateIntegrationService.new(integration, batch_ids, 'project').execute + end + # rubocop: enable CodeReuse/ActiveRecord +end diff --git a/config/sidekiq_queues.yml b/config/sidekiq_queues.yml index 823ec2eddb3afe..455ad7ee8b84af 100644 --- a/config/sidekiq_queues.yml +++ b/config/sidekiq_queues.yml @@ -232,6 +232,10 @@ - 1 - - propagate_integration - 1 +- - propagate_integration_group + - 1 +- - propagate_integration_project + - 1 - - propagate_service_template - 1 - - reactive_caching diff --git a/spec/models/group_spec.rb b/spec/models/group_spec.rb index 15972f66fd67ee..f440efdfb072a4 100644 --- a/spec/models/group_spec.rb +++ b/spec/models/group_spec.rb @@ -224,6 +224,20 @@ end end + describe '.without_integration' do + let(:another_group) { create(:group) } + let(:instance_integration) { build(:jira_service, :instance) } + + before do + create(:jira_service, group: group, project: nil) + create(:slack_service, group: another_group, project: nil) + end + + it 'returns groups without integration' do + expect(Group.without_integration(instance_integration)).to contain_exactly(another_group) + end + end + describe '.public_or_visible_to_user' do let!(:private_group) { create(:group, :private) } let!(:internal_group) { create(:group, :internal) } diff --git a/spec/models/integration_spec.rb b/spec/models/integration_spec.rb index 87ba0f3f7e6933..e4bb6522689233 100644 --- a/spec/models/integration_spec.rb +++ b/spec/models/integration_spec.rb @@ -11,18 +11,18 @@ before do create(:jira_service, project: project_1, inherit_from_id: instance_integration.id) create(:jira_service, project: project_2, inherit_from_id: nil) - create(:slack_service, project: project_1, inherit_from_id: nil) + create(:slack_service, project: project_3, inherit_from_id: nil) end - describe '#with_custom_integration_for' do + describe '.with_custom_integration_for' do it 'returns projects with custom integrations' do expect(Project.with_custom_integration_for(instance_integration)).to contain_exactly(project_2) end end - describe '#ids_without_integration' do - it 'returns projects ids without an integration' do - expect(Project.ids_without_integration(instance_integration, 100)).to contain_exactly(project_3.id) + describe '.without_integration' do + it 'returns projects without integration' do + expect(Project.without_integration(instance_integration)).to contain_exactly(project_3) end end end diff --git a/spec/services/admin/propagate_integration_service_spec.rb b/spec/services/admin/propagate_integration_service_spec.rb index 49d974b7154251..2c6948ce7fca76 100644 --- a/spec/services/admin/propagate_integration_service_spec.rb +++ b/spec/services/admin/propagate_integration_service_spec.rb @@ -10,9 +10,8 @@ stub_jira_service_test end - let(:excluded_attributes) { %w[id project_id group_id inherit_from_id instance created_at updated_at default] } - let!(:project) { create(:project) } - let!(:group) { create(:group) } + let_it_be(:project) { create(:project) } + let(:excluded_attributes) { %w[id project_id group_id inherit_from_id instance template created_at updated_at] } let!(:instance_integration) do JiraService.create!( instance: true, @@ -39,7 +38,7 @@ let!(:not_inherited_integration) do JiraService.create!( - project: create(:project), + project: project, inherit_from_id: nil, instance: false, active: true, @@ -52,7 +51,7 @@ let!(:different_type_inherited_integration) do BambooService.create!( - project: create(:project), + project: project, inherit_from_id: instance_integration.id, instance: false, active: true, @@ -64,8 +63,10 @@ ) end - shared_examples 'inherits settings from integration' do - it 'updates the inherited integrations' do + context 'with inherited integration' do + let(:integration) { inherited_integration } + + it 'updates the integration' do described_class.propagate(instance_integration) expect(integration.reload.inherit_from_id).to eq(instance_integration.id) @@ -73,10 +74,10 @@ .to eq(instance_integration.attributes.except(*excluded_attributes)) end - context 'integration with data fields' do + context 'with integration with data fields' do let(:excluded_attributes) { %w[id service_id created_at updated_at] } - it 'updates the data fields from inherited integrations' do + it 'updates the data fields from the integration' do described_class.propagate(instance_integration) expect(integration.reload.data_fields.attributes.except(*excluded_attributes)) @@ -85,54 +86,44 @@ end end - shared_examples 'does not inherit settings from integration' do - it 'does not update the not inherited integrations' do - described_class.propagate(instance_integration) + context 'with not inherited integration' do + let(:integration) { not_inherited_integration } - expect(integration.reload.attributes.except(*excluded_attributes)) - .not_to eq(instance_integration.attributes.except(*excluded_attributes)) + it 'does not update the integration' do + expect { described_class.propagate(instance_integration) } + .not_to change { instance_integration.attributes.except(*excluded_attributes) } end end - context 'update only inherited integrations' do - it_behaves_like 'inherits settings from integration' do - let(:integration) { inherited_integration } - end + context 'with different type inherited integration' do + let(:integration) { different_type_inherited_integration } - it_behaves_like 'does not inherit settings from integration' do - let(:integration) { not_inherited_integration } + it 'does not update the integration' do + expect { described_class.propagate(instance_integration) } + .not_to change { instance_integration.attributes.except(*excluded_attributes) } end + end - it_behaves_like 'does not inherit settings from integration' do - let(:integration) { different_type_inherited_integration } - end + context 'with a project without integration' do + let!(:another_project) { create(:project) } - it_behaves_like 'inherits settings from integration' do - let(:integration) { project.jira_service } - end + it 'calls to PropagateIntegrationProjectWorker' do + expect(PropagateIntegrationProjectWorker).to receive(:perform_async) + .with(instance_integration.id, another_project.id, another_project.id) - it_behaves_like 'inherits settings from integration' do - let(:integration) { Service.find_by(group_id: group.id) } + described_class.propagate(instance_integration) end end - it 'updates project#has_external_issue_tracker for issue tracker services' do - described_class.propagate(instance_integration) + context 'with a group without integration' do + let!(:group) { create(:group) } - expect(project.reload.has_external_issue_tracker).to eq(true) - end + it 'calls to PropagateIntegrationProjectWorker' do + expect(PropagateIntegrationGroupWorker).to receive(:perform_async) + .with(instance_integration.id, group.id, group.id) - it 'updates project#has_external_wiki for external wiki services' do - instance_integration = ExternalWikiService.create!( - instance: true, - active: true, - push_events: false, - external_wiki_url: 'http://external-wiki-url.com' - ) - - described_class.propagate(instance_integration) - - expect(project.reload.has_external_wiki).to eq(true) + described_class.propagate(instance_integration) + end end end end diff --git a/spec/services/admin/propagate_service_template_spec.rb b/spec/services/admin/propagate_service_template_spec.rb index 814a9db55de77e..d95d31ceaeacbb 100644 --- a/spec/services/admin/propagate_service_template_spec.rb +++ b/spec/services/admin/propagate_service_template_spec.rb @@ -4,6 +4,7 @@ RSpec.describe Admin::PropagateServiceTemplate do describe '.propagate' do + let_it_be(:project) { create(:project) } let!(:service_template) do PushoverService.create!( template: true, @@ -19,124 +20,40 @@ ) end - let!(:project) { create(:project) } - let(:excluded_attributes) { %w[id project_id template created_at updated_at default] } - - it 'creates services for projects' do - expect(project.pushover_service).to be_nil - - described_class.propagate(service_template) - - expect(project.reload.pushover_service).to be_present - end - - it 'creates services for a project that has another service' do - BambooService.create!( - active: true, - project: project, - properties: { - bamboo_url: 'http://gitlab.com', - username: 'mic', - password: 'password', - build_key: 'build' - } - ) - - expect(project.pushover_service).to be_nil + it 'calls to PropagateIntegrationProjectWorker' do + expect(PropagateIntegrationProjectWorker).to receive(:perform_async) + .with(service_template.id, project.id, project.id) described_class.propagate(service_template) - - expect(project.reload.pushover_service).to be_present end - it 'does not create the service if it exists already' do - other_service = BambooService.create!( - template: true, - active: true, - properties: { - bamboo_url: 'http://gitlab.com', - username: 'mic', - password: 'password', - build_key: 'build' - } - ) - - Service.build_from_integration(service_template, project_id: project.id).save! - Service.build_from_integration(other_service, project_id: project.id).save! - - expect { described_class.propagate(service_template) } - .not_to change { Service.count } - end - - it 'creates the service containing the template attributes' do - described_class.propagate(service_template) - - expect(project.pushover_service.properties).to eq(service_template.properties) - - expect(project.pushover_service.attributes.except(*excluded_attributes)) - .to eq(service_template.attributes.except(*excluded_attributes)) - end - - context 'service with data fields' do - include JiraServiceHelper - - let(:service_template) do - stub_jira_service_test - - JiraService.create!( - template: true, + context 'with a project that has another service' do + before do + BambooService.create!( active: true, - push_events: false, - url: 'http://jira.instance.com', - username: 'user', - password: 'secret' + project: project, + properties: { + bamboo_url: 'http://gitlab.com', + username: 'mic', + password: 'password', + build_key: 'build' + } ) end - it 'creates the service containing the template attributes' do - described_class.propagate(service_template) - - expect(project.jira_service.attributes.except(*excluded_attributes)) - .to eq(service_template.attributes.except(*excluded_attributes)) - - excluded_attributes = %w[id service_id created_at updated_at] - expect(project.jira_service.data_fields.attributes.except(*excluded_attributes)) - .to eq(service_template.data_fields.attributes.except(*excluded_attributes)) - end - end - - describe 'bulk update', :use_sql_query_cache do - let(:project_total) { 5 } - - before do - stub_const('Admin::PropagateServiceTemplate::BATCH_SIZE', 3) - - project_total.times { create(:project) } + it 'calls to PropagateIntegrationProjectWorker' do + expect(PropagateIntegrationProjectWorker).to receive(:perform_async) + .with(service_template.id, project.id, project.id) described_class.propagate(service_template) end - - it 'creates services for all projects' do - expect(Service.all.reload.count).to eq(project_total + 2) - end end - describe 'external tracker' do - it 'updates the project external tracker' do - service_template.update!(category: 'issue_tracker') - - expect { described_class.propagate(service_template) } - .to change { project.reload.has_external_issue_tracker }.to(true) - end - end - - describe 'external wiki' do - it 'updates the project external tracker' do - service_template.update!(type: 'ExternalWikiService') + it 'does not create the service if it exists already' do + Service.build_from_integration(service_template, project_id: project.id).save! - expect { described_class.propagate(service_template) } - .to change { project.reload.has_external_wiki }.to(true) - end + expect { described_class.propagate(service_template) } + .not_to change { Service.count } end end end diff --git a/spec/services/bulk_create_integration_service_spec.rb b/spec/services/bulk_create_integration_service_spec.rb new file mode 100644 index 00000000000000..4715734d2845fc --- /dev/null +++ b/spec/services/bulk_create_integration_service_spec.rb @@ -0,0 +1,107 @@ +# frozen_string_literal: true + +require 'spec_helper' + +RSpec.describe BulkCreateIntegrationService do + include JiraServiceHelper + + before do + stub_jira_service_test + end + + let(:excluded_attributes) { %w[id project_id group_id inherit_from_id instance template created_at updated_at] } + let!(:instance_integration) { create(:jira_service, :instance) } + let!(:template_integration) { create(:jira_service, :template) } + + shared_examples 'creates integration from batch ids' do + it 'updates the inherited integrations' do + described_class.new(integration, batch_ids, association).execute + + expect(created_integration.attributes.except(*excluded_attributes)) + .to eq(integration.attributes.except(*excluded_attributes)) + end + + context 'integration with data fields' do + let(:excluded_attributes) { %w[id service_id created_at updated_at] } + + it 'updates the data fields from inherited integrations' do + described_class.new(integration, batch_ids, association).execute + + expect(created_integration.reload.data_fields.attributes.except(*excluded_attributes)) + .to eq(integration.data_fields.attributes.except(*excluded_attributes)) + end + end + end + + shared_examples 'updates inherit_from_id' do + it 'updates inherit_from_id attributes' do + described_class.new(integration, batch_ids, association).execute + + expect(created_integration.reload.inherit_from_id).to eq(integration.id) + end + end + + shared_examples 'runs project callbacks' do + it 'updates projects#has_external_issue_tracker for issue tracker services' do + described_class.new(integration, batch_ids, association).execute + + expect(project.reload.has_external_issue_tracker).to eq(true) + end + + context 'with an external wiki integration' do + let(:integration) do + ExternalWikiService.create!( + instance: true, + active: true, + push_events: false, + external_wiki_url: 'http://external-wiki-url.com' + ) + end + + it 'updates projects#has_external_wiki for external wiki services' do + described_class.new(integration, batch_ids, association).execute + + expect(project.reload.has_external_wiki).to eq(true) + end + end + end + + context 'with an instance-level integration' do + let(:integration) { instance_integration } + + context 'with a project association' do + let!(:project) { create(:project) } + let(:created_integration) { project.jira_service } + let(:batch_ids) { [project.id] } + let(:association) { 'project' } + + it_behaves_like 'creates integration from batch ids' + it_behaves_like 'updates inherit_from_id' + it_behaves_like 'runs project callbacks' + end + + context 'with a group association' do + let!(:group) { create(:group) } + let(:created_integration) { Service.find_by(group: group) } + let(:batch_ids) { [group.id] } + let(:association) { 'group' } + + it_behaves_like 'creates integration from batch ids' + it_behaves_like 'updates inherit_from_id' + end + end + + context 'with a template integration' do + let(:integration) { template_integration } + + context 'with a project association' do + let!(:project) { create(:project) } + let(:created_integration) { project.jira_service } + let(:batch_ids) { [project.id] } + let(:association) { 'project' } + + it_behaves_like 'creates integration from batch ids' + it_behaves_like 'runs project callbacks' + end + end +end diff --git a/spec/workers/propagate_integration_group_worker_spec.rb b/spec/workers/propagate_integration_group_worker_spec.rb new file mode 100644 index 00000000000000..b9b1eb66f3333e --- /dev/null +++ b/spec/workers/propagate_integration_group_worker_spec.rb @@ -0,0 +1,19 @@ +# frozen_string_literal: true + +require 'spec_helper' + +RSpec.describe PropagateIntegrationGroupWorker do + describe '#perform' do + let!(:group1) { create(:group) } + let!(:group2) { create(:group) } + let!(:integration) { create(:redmine_service, :instance) } + + it 'calls to BulkCreateIntegrationService' do + expect(BulkCreateIntegrationService).to receive(:new) + .with(integration, [group1.id, group2.id], 'group') + .and_return(double(execute: nil)) + + subject.perform(integration.id, group1.id, group2.id) + end + end +end diff --git a/spec/workers/propagate_integration_project_worker_spec.rb b/spec/workers/propagate_integration_project_worker_spec.rb new file mode 100644 index 00000000000000..be59249ec32d25 --- /dev/null +++ b/spec/workers/propagate_integration_project_worker_spec.rb @@ -0,0 +1,19 @@ +# frozen_string_literal: true + +require 'spec_helper' + +RSpec.describe PropagateIntegrationProjectWorker do + describe '#perform' do + let!(:project1) { create(:project) } + let!(:project2) { create(:project) } + let!(:integration) { create(:redmine_service, :instance) } + + it 'calls to BulkCreateIntegrationService' do + expect(BulkCreateIntegrationService).to receive(:new) + .with(integration, [project1.id, project2.id], 'project') + .and_return(double(execute: nil)) + + subject.perform(integration.id, project1.id, project2.id) + end + end +end -- GitLab From 6333bc66842755375ce3ace8c1907aa3b441e2c2 Mon Sep 17 00:00:00 2001 From: Arturo Herrero Date: Thu, 17 Sep 2020 18:09:12 +0100 Subject: [PATCH 2/9] Update index on namespaces for type and id Including the id column in the partial index improves performance. --- ...integrations-using-batching-and-queues.yml | 5 ++++ ...ate_index_on_namespaces_for_type_and_id.rb | 23 +++++++++++++++++++ db/schema_migrations/20200917165525 | 1 + db/structure.sql | 2 +- 4 files changed, 30 insertions(+), 1 deletion(-) create mode 100644 changelogs/unreleased/209831-propagate-integrations-using-batching-and-queues.yml create mode 100644 db/post_migrate/20200917165525_update_index_on_namespaces_for_type_and_id.rb create mode 100644 db/schema_migrations/20200917165525 diff --git a/changelogs/unreleased/209831-propagate-integrations-using-batching-and-queues.yml b/changelogs/unreleased/209831-propagate-integrations-using-batching-and-queues.yml new file mode 100644 index 00000000000000..d46b05c5e9298f --- /dev/null +++ b/changelogs/unreleased/209831-propagate-integrations-using-batching-and-queues.yml @@ -0,0 +1,5 @@ +--- +title: Update database index on namespaces for type and id +merge_request: 42128 +author: +type: other diff --git a/db/post_migrate/20200917165525_update_index_on_namespaces_for_type_and_id.rb b/db/post_migrate/20200917165525_update_index_on_namespaces_for_type_and_id.rb new file mode 100644 index 00000000000000..35b72b4f160dad --- /dev/null +++ b/db/post_migrate/20200917165525_update_index_on_namespaces_for_type_and_id.rb @@ -0,0 +1,23 @@ +# frozen_string_literal: true + +class UpdateIndexOnNamespacesForTypeAndId < ActiveRecord::Migration[6.0] + include Gitlab::Database::MigrationHelpers + + DOWNTIME = false + disable_ddl_transaction! + + OLD_INDEX_NAME = 'index_namespaces_on_type_partial' + NEW_INDEX_NAME = 'index_namespaces_on_type_and_id_partial' + + def up + add_concurrent_index(:namespaces, [:type, :id], where: 'type IS NOT NULL', name: NEW_INDEX_NAME) + + remove_concurrent_index_by_name(:namespaces, OLD_INDEX_NAME) + end + + def down + add_concurrent_index(:namespaces, :type, where: 'type IS NOT NULL', name: OLD_INDEX_NAME) + + remove_concurrent_index_by_name(:namespaces, NEW_INDEX_NAME) + end +end diff --git a/db/schema_migrations/20200917165525 b/db/schema_migrations/20200917165525 new file mode 100644 index 00000000000000..bf01a95ad14d54 --- /dev/null +++ b/db/schema_migrations/20200917165525 @@ -0,0 +1 @@ +0080b9192ba5b4ea3853cfd930d58e10b9619f3d9a54016b574712e5ec2084f6 \ No newline at end of file diff --git a/db/structure.sql b/db/structure.sql index b63f996487f6dc..520a780474466d 100644 --- a/db/structure.sql +++ b/db/structure.sql @@ -20596,7 +20596,7 @@ CREATE UNIQUE INDEX index_namespaces_on_runners_token_encrypted ON namespaces US CREATE INDEX index_namespaces_on_shared_and_extra_runners_minutes_limit ON namespaces USING btree (shared_runners_minutes_limit, extra_shared_runners_minutes_limit); -CREATE INDEX index_namespaces_on_type_partial ON namespaces USING btree (type) WHERE (type IS NOT NULL); +CREATE INDEX index_namespaces_on_type_and_id_partial ON namespaces USING btree (type, id) WHERE (type IS NOT NULL); CREATE INDEX index_non_requested_project_members_on_source_id_and_type ON members USING btree (source_id, source_type) WHERE ((requested_at IS NULL) AND ((type)::text = 'ProjectMember'::text)); -- GitLab From 1336ebeafa0ebc7b3b990128106e686ad039d88a Mon Sep 17 00:00:00 2001 From: Arturo Herrero Date: Fri, 18 Sep 2020 17:12:15 +0100 Subject: [PATCH 3/9] Update integrations using batching and queue This updates all inherited integrations using the same approach of batching and Sidekiq queue that we use when bulk create integrations for project and group integrations. --- app/models/service.rb | 1 + .../admin/propagate_integration_service.rb | 35 ++---------- .../bulk_update_integration_service.rb | 37 ++++++++++++ app/workers/all_queues.yml | 8 +++ .../propagate_integration_inherit_worker.rb | 18 ++++++ config/sidekiq_queues.yml | 2 + .../propagate_integration_service_spec.rb | 39 ++----------- .../bulk_update_integration_service_spec.rb | 57 +++++++++++++++++++ ...opagate_integration_inherit_worker_spec.rb | 20 +++++++ 9 files changed, 152 insertions(+), 65 deletions(-) create mode 100644 app/services/bulk_update_integration_service.rb create mode 100644 app/workers/propagate_integration_inherit_worker.rb create mode 100644 spec/services/bulk_update_integration_service_spec.rb create mode 100644 spec/workers/propagate_integration_inherit_worker_spec.rb diff --git a/app/models/service.rb b/app/models/service.rb index 087985b192ad96..48fa62d3d46a98 100644 --- a/app/models/service.rb +++ b/app/models/service.rb @@ -63,6 +63,7 @@ class Service < ApplicationRecord scope :active, -> { where(active: true) } scope :by_type, -> (type) { where(type: type) } scope :by_active_flag, -> (flag) { where(active: flag) } + scope :inherit_from_id, -> (id) { where(inherit_from_id: id) } scope :for_group, -> (group) { where(group_id: group, type: available_services_types) } scope :for_template, -> { where(template: true, type: available_services_types) } scope :for_instance, -> { where(instance: true, type: available_services_types) } diff --git a/app/services/admin/propagate_integration_service.rb b/app/services/admin/propagate_integration_service.rb index 8ed5b4ee303ac5..80e27c21d5b975 100644 --- a/app/services/admin/propagate_integration_service.rb +++ b/app/services/admin/propagate_integration_service.rb @@ -5,39 +5,22 @@ class PropagateIntegrationService include PropagateService def propagate + update_inherited_integrations + create_integration_for_groups_without_integration if Feature.enabled?(:group_level_integrations) create_integration_for_projects_without_integration - update_inherited_integrations end private # rubocop: disable Cop/InBatches - # rubocop: disable CodeReuse/ActiveRecord def update_inherited_integrations - Service.where(type: integration.type, inherit_from_id: integration.id).in_batches(of: BATCH_SIZE) do |batch| - bulk_update_from_integration(batch) + Service.by_type(integration.type).inherit_from_id(integration.id).in_batches(of: BATCH_SIZE) do |services| + min_id, max_id = services.pick("MIN(services.id), MAX(services.id)") + PropagateIntegrationInheritWorker.perform_async(integration.id, min_id, max_id) end end # rubocop: enable Cop/InBatches - # rubocop: enable CodeReuse/ActiveRecord - - # rubocop: disable CodeReuse/ActiveRecord - def bulk_update_from_integration(batch) - # Retrieving the IDs instantiates the ActiveRecord relation (batch) - # into concrete models, otherwise update_all will clear the relation. - # https://stackoverflow.com/q/34811646/462015 - batch_ids = batch.pluck(:id) - - Service.transaction do - batch.update_all(service_hash) - - if integration.data_fields_present? - integration.data_fields.class.where(service_id: batch_ids).update_all(data_fields_hash) - end - end - end - # rubocop: enable CodeReuse/ActiveRecord def create_integration_for_groups_without_integration Group.without_integration(integration).each_batch(of: BATCH_SIZE) do |groups| @@ -45,13 +28,5 @@ def create_integration_for_groups_without_integration PropagateIntegrationGroupWorker.perform_async(integration.id, min_id, max_id) end end - - def service_hash - integration.to_service_hash.tap { |json| json['inherit_from_id'] = integration.id } - end - - def data_fields_hash - integration.to_data_fields_hash - end end end diff --git a/app/services/bulk_update_integration_service.rb b/app/services/bulk_update_integration_service.rb new file mode 100644 index 00000000000000..a4aef40c16f4a4 --- /dev/null +++ b/app/services/bulk_update_integration_service.rb @@ -0,0 +1,37 @@ +# frozen_string_literal: true + +class BulkUpdateIntegrationService + def initialize(integration, batch) + @integration = integration + @batch = batch + end + + # rubocop: disable CodeReuse/ActiveRecord + def execute + # Retrieving the IDs instantiates the ActiveRecord relation (batch) + # into concrete models, otherwise update_all will clear the relation. + # https://stackoverflow.com/q/34811646/462015 + batch_ids = batch.pluck(:id) + + Service.transaction do + batch.update_all(service_hash) + + if integration.data_fields_present? + integration.data_fields.class.where(service_id: batch_ids).update_all(data_fields_hash) + end + end + end + # rubocop: enable CodeReuse/ActiveRecord + + private + + attr_reader :integration, :batch + + def service_hash + integration.to_service_hash.tap { |json| json['inherit_from_id'] = integration.id } + end + + def data_fields_hash + integration.to_data_fields_hash + end +end diff --git a/app/workers/all_queues.yml b/app/workers/all_queues.yml index 519c999088db2d..f564bf7df3d48b 100644 --- a/app/workers/all_queues.yml +++ b/app/workers/all_queues.yml @@ -1724,6 +1724,14 @@ :weight: 1 :idempotent: true :tags: [] +- :name: propagate_integration_inherit + :feature_category: :integrations + :has_external_dependencies: + :urgency: :low + :resource_boundary: :unknown + :weight: 1 + :idempotent: true + :tags: [] - :name: propagate_integration_project :feature_category: :integrations :has_external_dependencies: diff --git a/app/workers/propagate_integration_inherit_worker.rb b/app/workers/propagate_integration_inherit_worker.rb new file mode 100644 index 00000000000000..ee67e2d4c0fbcc --- /dev/null +++ b/app/workers/propagate_integration_inherit_worker.rb @@ -0,0 +1,18 @@ +# frozen_string_literal: true + +class PropagateIntegrationInheritWorker + include ApplicationWorker + + feature_category :integrations + idempotent! + loggable_arguments 1 + + # rubocop: disable CodeReuse/ActiveRecord + def perform(integration_id, min_id, max_id) + integration = Service.find(integration_id) + services = Service.where(id: min_id..max_id).by_type(integration.type).inherit_from_id(integration.id) + + BulkUpdateIntegrationService.new(integration, services).execute + end + # rubocop: enable CodeReuse/ActiveRecord +end diff --git a/config/sidekiq_queues.yml b/config/sidekiq_queues.yml index 455ad7ee8b84af..d61df219fe8964 100644 --- a/config/sidekiq_queues.yml +++ b/config/sidekiq_queues.yml @@ -234,6 +234,8 @@ - 1 - - propagate_integration_group - 1 +- - propagate_integration_inherit + - 1 - - propagate_integration_project - 1 - - propagate_service_template diff --git a/spec/services/admin/propagate_integration_service_spec.rb b/spec/services/admin/propagate_integration_service_spec.rb index 2c6948ce7fca76..5dfe39aebbcde5 100644 --- a/spec/services/admin/propagate_integration_service_spec.rb +++ b/spec/services/admin/propagate_integration_service_spec.rb @@ -11,7 +11,6 @@ end let_it_be(:project) { create(:project) } - let(:excluded_attributes) { %w[id project_id group_id inherit_from_id instance template created_at updated_at] } let!(:instance_integration) do JiraService.create!( instance: true, @@ -66,41 +65,11 @@ context 'with inherited integration' do let(:integration) { inherited_integration } - it 'updates the integration' do - described_class.propagate(instance_integration) - - expect(integration.reload.inherit_from_id).to eq(instance_integration.id) - expect(integration.attributes.except(*excluded_attributes)) - .to eq(instance_integration.attributes.except(*excluded_attributes)) - end - - context 'with integration with data fields' do - let(:excluded_attributes) { %w[id service_id created_at updated_at] } - - it 'updates the data fields from the integration' do - described_class.propagate(instance_integration) - - expect(integration.reload.data_fields.attributes.except(*excluded_attributes)) - .to eq(instance_integration.data_fields.attributes.except(*excluded_attributes)) - end - end - end - - context 'with not inherited integration' do - let(:integration) { not_inherited_integration } - - it 'does not update the integration' do - expect { described_class.propagate(instance_integration) } - .not_to change { instance_integration.attributes.except(*excluded_attributes) } - end - end - - context 'with different type inherited integration' do - let(:integration) { different_type_inherited_integration } + it 'calls to PropagateIntegrationProjectWorker' do + expect(PropagateIntegrationInheritWorker).to receive(:perform_async) + .with(instance_integration.id, inherited_integration.id, inherited_integration.id) - it 'does not update the integration' do - expect { described_class.propagate(instance_integration) } - .not_to change { instance_integration.attributes.except(*excluded_attributes) } + described_class.propagate(instance_integration) end end diff --git a/spec/services/bulk_update_integration_service_spec.rb b/spec/services/bulk_update_integration_service_spec.rb new file mode 100644 index 00000000000000..2f0bfd3160098b --- /dev/null +++ b/spec/services/bulk_update_integration_service_spec.rb @@ -0,0 +1,57 @@ +# frozen_string_literal: true + +require 'spec_helper' + +RSpec.describe BulkUpdateIntegrationService do + include JiraServiceHelper + + before do + stub_jira_service_test + end + + let(:excluded_attributes) { %w[id project_id group_id inherit_from_id instance template created_at updated_at] } + let!(:instance_integration) do + JiraService.create!( + instance: true, + active: true, + push_events: true, + url: 'http://update-jira.instance.com', + username: 'user', + password: 'secret' + ) + end + + let!(:integration) do + JiraService.create!( + project: create(:project), + inherit_from_id: instance_integration.id, + instance: false, + active: true, + push_events: false, + url: 'http://jira.instance.com', + username: 'user', + password: 'secret' + ) + end + + context 'with inherited integration' do + it 'updates the integration' do + described_class.new(instance_integration, Service.inherit_from_id(instance_integration.id)).execute + + expect(integration.reload.inherit_from_id).to eq(instance_integration.id) + expect(integration.attributes.except(*excluded_attributes)) + .to eq(instance_integration.attributes.except(*excluded_attributes)) + end + + context 'with integration with data fields' do + let(:excluded_attributes) { %w[id service_id created_at updated_at] } + + it 'updates the data fields from the integration' do + described_class.new(instance_integration, Service.inherit_from_id(instance_integration.id)).execute + + expect(integration.reload.data_fields.attributes.except(*excluded_attributes)) + .to eq(instance_integration.data_fields.attributes.except(*excluded_attributes)) + end + end + end +end diff --git a/spec/workers/propagate_integration_inherit_worker_spec.rb b/spec/workers/propagate_integration_inherit_worker_spec.rb new file mode 100644 index 00000000000000..b8753892edca10 --- /dev/null +++ b/spec/workers/propagate_integration_inherit_worker_spec.rb @@ -0,0 +1,20 @@ +# frozen_string_literal: true + +require 'spec_helper' + +RSpec.describe PropagateIntegrationInheritWorker do + describe '#perform' do + let(:integration) { create(:redmine_service, :instance) } + let!(:integration1) { create(:redmine_service, inherit_from_id: integration.id) } + let!(:integration2) { create(:bugzilla_service, inherit_from_id: integration.id) } + let!(:integration3) { create(:redmine_service) } + + it 'calls to BulkCreateIntegrationService' do + expect(BulkUpdateIntegrationService).to receive(:new) + .with(integration, match_array(integration1)) + .and_return(double(execute: nil)) + + subject.perform(integration.id, integration1.id, integration3.id) + end + end +end -- GitLab From 45fcb25632e48caca27c7b31f54d1ff6efa7325e Mon Sep 17 00:00:00 2001 From: Arturo Herrero Date: Mon, 21 Sep 2020 18:16:00 +0100 Subject: [PATCH 4/9] Return if the integration is no longer found This change makes the workers idempotent as it doesn't raise any error if the integration is no longer found. --- app/workers/propagate_integration_group_worker.rb | 4 +++- app/workers/propagate_integration_inherit_worker.rb | 4 +++- app/workers/propagate_integration_project_worker.rb | 4 +++- 3 files changed, 9 insertions(+), 3 deletions(-) diff --git a/app/workers/propagate_integration_group_worker.rb b/app/workers/propagate_integration_group_worker.rb index c9c51f680541d0..e6ca86635a443c 100644 --- a/app/workers/propagate_integration_group_worker.rb +++ b/app/workers/propagate_integration_group_worker.rb @@ -9,7 +9,9 @@ class PropagateIntegrationGroupWorker # rubocop: disable CodeReuse/ActiveRecord def perform(integration_id, min_id, max_id) - integration = Service.find(integration_id) + integration = Service.find_by_id(integration_id) + return unless integration + batch_ids = Group.where(id: min_id..max_id).without_integration(integration).pluck(:id) BulkCreateIntegrationService.new(integration, batch_ids, 'group').execute diff --git a/app/workers/propagate_integration_inherit_worker.rb b/app/workers/propagate_integration_inherit_worker.rb index ee67e2d4c0fbcc..316726d1363443 100644 --- a/app/workers/propagate_integration_inherit_worker.rb +++ b/app/workers/propagate_integration_inherit_worker.rb @@ -9,7 +9,9 @@ class PropagateIntegrationInheritWorker # rubocop: disable CodeReuse/ActiveRecord def perform(integration_id, min_id, max_id) - integration = Service.find(integration_id) + integration = Service.find_by_id(integration_id) + return unless integration + services = Service.where(id: min_id..max_id).by_type(integration.type).inherit_from_id(integration.id) BulkUpdateIntegrationService.new(integration, services).execute diff --git a/app/workers/propagate_integration_project_worker.rb b/app/workers/propagate_integration_project_worker.rb index c568daae30876f..675c745ae0f3ee 100644 --- a/app/workers/propagate_integration_project_worker.rb +++ b/app/workers/propagate_integration_project_worker.rb @@ -9,7 +9,9 @@ class PropagateIntegrationProjectWorker # rubocop: disable CodeReuse/ActiveRecord def perform(integration_id, min_id, max_id) - integration = Service.find(integration_id) + integration = Service.find_by_id(integration_id) + return unless integration + batch_ids = Project.where(id: min_id..max_id).without_integration(integration).pluck(:id) BulkCreateIntegrationService.new(integration, batch_ids, 'project').execute -- GitLab From 45b1b6a1f553ca6c0f65318d99645ee59337308e Mon Sep 17 00:00:00 2001 From: Arturo Herrero Date: Mon, 21 Sep 2020 19:48:21 +0100 Subject: [PATCH 5/9] Remove unnecessary retrieving of IDs --- app/services/bulk_update_integration_service.rb | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/app/services/bulk_update_integration_service.rb b/app/services/bulk_update_integration_service.rb index a4aef40c16f4a4..74d77618f2cb9b 100644 --- a/app/services/bulk_update_integration_service.rb +++ b/app/services/bulk_update_integration_service.rb @@ -8,16 +8,11 @@ def initialize(integration, batch) # rubocop: disable CodeReuse/ActiveRecord def execute - # Retrieving the IDs instantiates the ActiveRecord relation (batch) - # into concrete models, otherwise update_all will clear the relation. - # https://stackoverflow.com/q/34811646/462015 - batch_ids = batch.pluck(:id) - Service.transaction do batch.update_all(service_hash) if integration.data_fields_present? - integration.data_fields.class.where(service_id: batch_ids).update_all(data_fields_hash) + integration.data_fields.class.where(service_id: batch.select(:id)).update_all(data_fields_hash) end end end -- GitLab From 847e5a5c49a329c1b3f148f35de565f43034fa0b Mon Sep 17 00:00:00 2001 From: Arturo Herrero Date: Tue, 22 Sep 2020 12:43:14 +0100 Subject: [PATCH 6/9] Pass ActiveRecord::Relation to use sub-queries We don't need to pluck the set of ids into memory. We can pass the ActiveRecord::Relation and use sub-queries. --- app/models/data_list.rb | 10 +++++----- app/models/service_list.rb | 12 ++++++------ app/services/bulk_create_integration_service.rb | 16 ++++++++-------- .../propagate_integration_group_worker.rb | 4 ++-- .../propagate_integration_project_worker.rb | 4 ++-- .../bulk_create_integration_service_spec.rb | 16 ++++++++-------- .../propagate_integration_group_worker_spec.rb | 2 +- .../propagate_integration_project_worker_spec.rb | 2 +- 8 files changed, 33 insertions(+), 33 deletions(-) diff --git a/app/models/data_list.rb b/app/models/data_list.rb index 2cee34478861df..adad8e3013ecb5 100644 --- a/app/models/data_list.rb +++ b/app/models/data_list.rb @@ -1,8 +1,8 @@ # frozen_string_literal: true class DataList - def initialize(batch_ids, data_fields_hash, klass) - @batch_ids = batch_ids + def initialize(batch, data_fields_hash, klass) + @batch = batch @data_fields_hash = data_fields_hash @klass = klass end @@ -13,15 +13,15 @@ def to_array private - attr_reader :batch_ids, :data_fields_hash, :klass + attr_reader :batch, :data_fields_hash, :klass def columns data_fields_hash.keys << 'service_id' end def values - batch_ids.map do |row| - data_fields_hash.values << row['id'] + batch.map do |record| + data_fields_hash.values << record['id'] end end end diff --git a/app/models/service_list.rb b/app/models/service_list.rb index 9cbc5e680591ad..5eca5f2bda1442 100644 --- a/app/models/service_list.rb +++ b/app/models/service_list.rb @@ -1,8 +1,8 @@ # frozen_string_literal: true class ServiceList - def initialize(batch_ids, service_hash, association) - @batch_ids = batch_ids + def initialize(batch, service_hash, association) + @batch = batch @service_hash = service_hash @association = association end @@ -13,15 +13,15 @@ def to_array private - attr_reader :batch_ids, :service_hash, :association + attr_reader :batch, :service_hash, :association def columns - (service_hash.keys << "#{association}_id") + service_hash.keys << "#{association}_id" end def values - batch_ids.map do |id| - (service_hash.values << id) + batch.select(:id).map do |record| + service_hash.values << record.id end end end diff --git a/app/services/bulk_create_integration_service.rb b/app/services/bulk_create_integration_service.rb index b3b3da367a7a60..23b89b0d8a9db3 100644 --- a/app/services/bulk_create_integration_service.rb +++ b/app/services/bulk_create_integration_service.rb @@ -1,14 +1,14 @@ # frozen_string_literal: true class BulkCreateIntegrationService - def initialize(integration, batch_ids, association) + def initialize(integration, batch, association) @integration = integration - @batch_ids = batch_ids + @batch = batch @association = association end def execute - service_list = ServiceList.new(batch_ids, service_hash, association).to_array + service_list = ServiceList.new(batch, service_hash, association).to_array Service.transaction do results = bulk_insert(*service_list) @@ -19,13 +19,13 @@ def execute bulk_insert(*data_list) end - run_callbacks(batch_ids) if association == 'project' + run_callbacks(batch) if association == 'project' end end private - attr_reader :integration, :batch_ids, :association + attr_reader :integration, :batch, :association def bulk_insert(klass, columns, values_array) items_to_insert = values_array.map { |array| Hash[columns.zip(array)] } @@ -34,13 +34,13 @@ def bulk_insert(klass, columns, values_array) end # rubocop: disable CodeReuse/ActiveRecord - def run_callbacks(batch_ids) + def run_callbacks(batch) if integration.issue_tracker? - Project.where(id: batch_ids).update_all(has_external_issue_tracker: true) + Project.where(id: batch.select(:id)).update_all(has_external_issue_tracker: true) end if integration.type == 'ExternalWikiService' - Project.where(id: batch_ids).update_all(has_external_wiki: true) + Project.where(id: batch.select(:id)).update_all(has_external_wiki: true) end end # rubocop: enable CodeReuse/ActiveRecord diff --git a/app/workers/propagate_integration_group_worker.rb b/app/workers/propagate_integration_group_worker.rb index e6ca86635a443c..5967cdacc9b57d 100644 --- a/app/workers/propagate_integration_group_worker.rb +++ b/app/workers/propagate_integration_group_worker.rb @@ -12,9 +12,9 @@ def perform(integration_id, min_id, max_id) integration = Service.find_by_id(integration_id) return unless integration - batch_ids = Group.where(id: min_id..max_id).without_integration(integration).pluck(:id) + batch = Group.where(id: min_id..max_id).without_integration(integration) - BulkCreateIntegrationService.new(integration, batch_ids, 'group').execute + BulkCreateIntegrationService.new(integration, batch, 'group').execute end # rubocop: enable CodeReuse/ActiveRecord end diff --git a/app/workers/propagate_integration_project_worker.rb b/app/workers/propagate_integration_project_worker.rb index 675c745ae0f3ee..67c1e36d4c5a1a 100644 --- a/app/workers/propagate_integration_project_worker.rb +++ b/app/workers/propagate_integration_project_worker.rb @@ -12,9 +12,9 @@ def perform(integration_id, min_id, max_id) integration = Service.find_by_id(integration_id) return unless integration - batch_ids = Project.where(id: min_id..max_id).without_integration(integration).pluck(:id) + batch = Project.where(id: min_id..max_id).without_integration(integration) - BulkCreateIntegrationService.new(integration, batch_ids, 'project').execute + BulkCreateIntegrationService.new(integration, batch, 'project').execute end # rubocop: enable CodeReuse/ActiveRecord end diff --git a/spec/services/bulk_create_integration_service_spec.rb b/spec/services/bulk_create_integration_service_spec.rb index 4715734d2845fc..5d896f78b351e2 100644 --- a/spec/services/bulk_create_integration_service_spec.rb +++ b/spec/services/bulk_create_integration_service_spec.rb @@ -15,7 +15,7 @@ shared_examples 'creates integration from batch ids' do it 'updates the inherited integrations' do - described_class.new(integration, batch_ids, association).execute + described_class.new(integration, batch, association).execute expect(created_integration.attributes.except(*excluded_attributes)) .to eq(integration.attributes.except(*excluded_attributes)) @@ -25,7 +25,7 @@ let(:excluded_attributes) { %w[id service_id created_at updated_at] } it 'updates the data fields from inherited integrations' do - described_class.new(integration, batch_ids, association).execute + described_class.new(integration, batch, association).execute expect(created_integration.reload.data_fields.attributes.except(*excluded_attributes)) .to eq(integration.data_fields.attributes.except(*excluded_attributes)) @@ -35,7 +35,7 @@ shared_examples 'updates inherit_from_id' do it 'updates inherit_from_id attributes' do - described_class.new(integration, batch_ids, association).execute + described_class.new(integration, batch, association).execute expect(created_integration.reload.inherit_from_id).to eq(integration.id) end @@ -43,7 +43,7 @@ shared_examples 'runs project callbacks' do it 'updates projects#has_external_issue_tracker for issue tracker services' do - described_class.new(integration, batch_ids, association).execute + described_class.new(integration, batch, association).execute expect(project.reload.has_external_issue_tracker).to eq(true) end @@ -59,7 +59,7 @@ end it 'updates projects#has_external_wiki for external wiki services' do - described_class.new(integration, batch_ids, association).execute + described_class.new(integration, batch, association).execute expect(project.reload.has_external_wiki).to eq(true) end @@ -72,7 +72,7 @@ context 'with a project association' do let!(:project) { create(:project) } let(:created_integration) { project.jira_service } - let(:batch_ids) { [project.id] } + let(:batch) { Project.all } let(:association) { 'project' } it_behaves_like 'creates integration from batch ids' @@ -83,7 +83,7 @@ context 'with a group association' do let!(:group) { create(:group) } let(:created_integration) { Service.find_by(group: group) } - let(:batch_ids) { [group.id] } + let(:batch) { Group.all } let(:association) { 'group' } it_behaves_like 'creates integration from batch ids' @@ -97,7 +97,7 @@ context 'with a project association' do let!(:project) { create(:project) } let(:created_integration) { project.jira_service } - let(:batch_ids) { [project.id] } + let(:batch) { Project.all } let(:association) { 'project' } it_behaves_like 'creates integration from batch ids' diff --git a/spec/workers/propagate_integration_group_worker_spec.rb b/spec/workers/propagate_integration_group_worker_spec.rb index b9b1eb66f3333e..30be9374db0481 100644 --- a/spec/workers/propagate_integration_group_worker_spec.rb +++ b/spec/workers/propagate_integration_group_worker_spec.rb @@ -10,7 +10,7 @@ it 'calls to BulkCreateIntegrationService' do expect(BulkCreateIntegrationService).to receive(:new) - .with(integration, [group1.id, group2.id], 'group') + .with(integration, match_array([group1, group2]), 'group') .and_return(double(execute: nil)) subject.perform(integration.id, group1.id, group2.id) diff --git a/spec/workers/propagate_integration_project_worker_spec.rb b/spec/workers/propagate_integration_project_worker_spec.rb index be59249ec32d25..e5ea47854ba5bb 100644 --- a/spec/workers/propagate_integration_project_worker_spec.rb +++ b/spec/workers/propagate_integration_project_worker_spec.rb @@ -10,7 +10,7 @@ it 'calls to BulkCreateIntegrationService' do expect(BulkCreateIntegrationService).to receive(:new) - .with(integration, [project1.id, project2.id], 'project') + .with(integration, match_array([project1, project2]), 'project') .and_return(double(execute: nil)) subject.perform(integration.id, project1.id, project2.id) -- GitLab From 422951257bd84117512c9847aac77ed1cf322fb0 Mon Sep 17 00:00:00 2001 From: Arturo Herrero Date: Wed, 23 Sep 2020 12:17:07 +0100 Subject: [PATCH 7/9] Test idempotent on propagation workers Using 'an idempotent workers' shared example to test the idempotent of the workers. --- ...propagate_integration_group_worker_spec.rb | 30 +++++++++++++---- ...opagate_integration_inherit_worker_spec.rb | 32 +++++++++++++++---- ...opagate_integration_project_worker_spec.rb | 30 +++++++++++++---- 3 files changed, 73 insertions(+), 19 deletions(-) diff --git a/spec/workers/propagate_integration_group_worker_spec.rb b/spec/workers/propagate_integration_group_worker_spec.rb index 30be9374db0481..b3c9255db577f3 100644 --- a/spec/workers/propagate_integration_group_worker_spec.rb +++ b/spec/workers/propagate_integration_group_worker_spec.rb @@ -4,16 +4,34 @@ RSpec.describe PropagateIntegrationGroupWorker do describe '#perform' do - let!(:group1) { create(:group) } - let!(:group2) { create(:group) } - let!(:integration) { create(:redmine_service, :instance) } + let_it_be(:group1) { create(:group) } + let_it_be(:group2) { create(:group) } + let_it_be(:integration) { create(:redmine_service, :instance) } - it 'calls to BulkCreateIntegrationService' do - expect(BulkCreateIntegrationService).to receive(:new) + before do + allow(BulkCreateIntegrationService).to receive(:new) .with(integration, match_array([group1, group2]), 'group') .and_return(double(execute: nil)) + end + + it_behaves_like 'an idempotent worker' do + let(:job_args) { [integration.id, group1.id, group2.id] } + + it 'calls to BulkCreateIntegrationService' do + expect(BulkCreateIntegrationService).to receive(:new) + .with(integration, match_array([group1, group2]), 'group') + .and_return(double(execute: nil)) + + subject + end + end + + context 'with an invalid integration id' do + it 'returns without failure' do + expect(BulkCreateIntegrationService).not_to receive(:new) - subject.perform(integration.id, group1.id, group2.id) + subject.perform(0, group1.id, group2.id) + end end end end diff --git a/spec/workers/propagate_integration_inherit_worker_spec.rb b/spec/workers/propagate_integration_inherit_worker_spec.rb index b8753892edca10..88946b9926ec74 100644 --- a/spec/workers/propagate_integration_inherit_worker_spec.rb +++ b/spec/workers/propagate_integration_inherit_worker_spec.rb @@ -4,17 +4,35 @@ RSpec.describe PropagateIntegrationInheritWorker do describe '#perform' do - let(:integration) { create(:redmine_service, :instance) } - let!(:integration1) { create(:redmine_service, inherit_from_id: integration.id) } - let!(:integration2) { create(:bugzilla_service, inherit_from_id: integration.id) } - let!(:integration3) { create(:redmine_service) } + let_it_be(:integration) { create(:redmine_service, :instance) } + let_it_be(:integration1) { create(:redmine_service, inherit_from_id: integration.id) } + let_it_be(:integration2) { create(:bugzilla_service, inherit_from_id: integration.id) } + let_it_be(:integration3) { create(:redmine_service) } - it 'calls to BulkCreateIntegrationService' do - expect(BulkUpdateIntegrationService).to receive(:new) + before do + allow(BulkUpdateIntegrationService).to receive(:new) .with(integration, match_array(integration1)) .and_return(double(execute: nil)) + end + + it_behaves_like 'an idempotent worker' do + let(:job_args) { [integration.id, integration1.id, integration3.id] } + + it 'calls to BulkCreateIntegrationService' do + expect(BulkUpdateIntegrationService).to receive(:new) + .with(integration, match_array(integration1)) + .and_return(double(execute: nil)) + + subject + end + end + + context 'with an invalid integration id' do + it 'returns without failure' do + expect(BulkUpdateIntegrationService).not_to receive(:new) - subject.perform(integration.id, integration1.id, integration3.id) + subject.perform(0, integration1.id, integration3.id) + end end end end diff --git a/spec/workers/propagate_integration_project_worker_spec.rb b/spec/workers/propagate_integration_project_worker_spec.rb index e5ea47854ba5bb..3742ce5fde8667 100644 --- a/spec/workers/propagate_integration_project_worker_spec.rb +++ b/spec/workers/propagate_integration_project_worker_spec.rb @@ -4,16 +4,34 @@ RSpec.describe PropagateIntegrationProjectWorker do describe '#perform' do - let!(:project1) { create(:project) } - let!(:project2) { create(:project) } - let!(:integration) { create(:redmine_service, :instance) } + let_it_be(:project1) { create(:project) } + let_it_be(:project2) { create(:project) } + let_it_be(:integration) { create(:redmine_service, :instance) } - it 'calls to BulkCreateIntegrationService' do - expect(BulkCreateIntegrationService).to receive(:new) + before do + allow(BulkCreateIntegrationService).to receive(:new) .with(integration, match_array([project1, project2]), 'project') .and_return(double(execute: nil)) + end + + it_behaves_like 'an idempotent worker' do + let(:job_args) { [integration.id, project1.id, project2.id] } + + it 'calls to BulkCreateIntegrationService' do + expect(BulkCreateIntegrationService).to receive(:new) + .with(integration, match_array([project1, project2]), 'project') + .and_return(double(execute: nil)) + + subject + end + end + + context 'with an invalid integration id' do + it 'returns without failure' do + expect(BulkCreateIntegrationService).not_to receive(:new) - subject.perform(integration.id, project1.id, project2.id) + subject.perform(0, project1.id, project2.id) + end end end end -- GitLab From 0b2c94b76acfebe3732f286404165b80454c12a7 Mon Sep 17 00:00:00 2001 From: Arturo Herrero Date: Fri, 25 Sep 2020 18:36:01 +0100 Subject: [PATCH 8/9] Remove loggable_arguments override We can remove loggable_arguments given that the arguments are numeric. --- app/workers/propagate_integration_group_worker.rb | 1 - app/workers/propagate_integration_inherit_worker.rb | 1 - app/workers/propagate_integration_project_worker.rb | 1 - 3 files changed, 3 deletions(-) diff --git a/app/workers/propagate_integration_group_worker.rb b/app/workers/propagate_integration_group_worker.rb index 5967cdacc9b57d..e539c6d4719424 100644 --- a/app/workers/propagate_integration_group_worker.rb +++ b/app/workers/propagate_integration_group_worker.rb @@ -5,7 +5,6 @@ class PropagateIntegrationGroupWorker feature_category :integrations idempotent! - loggable_arguments 1 # rubocop: disable CodeReuse/ActiveRecord def perform(integration_id, min_id, max_id) diff --git a/app/workers/propagate_integration_inherit_worker.rb b/app/workers/propagate_integration_inherit_worker.rb index 316726d1363443..ef3132202f654b 100644 --- a/app/workers/propagate_integration_inherit_worker.rb +++ b/app/workers/propagate_integration_inherit_worker.rb @@ -5,7 +5,6 @@ class PropagateIntegrationInheritWorker feature_category :integrations idempotent! - loggable_arguments 1 # rubocop: disable CodeReuse/ActiveRecord def perform(integration_id, min_id, max_id) diff --git a/app/workers/propagate_integration_project_worker.rb b/app/workers/propagate_integration_project_worker.rb index 67c1e36d4c5a1a..c1e286b24fcb6b 100644 --- a/app/workers/propagate_integration_project_worker.rb +++ b/app/workers/propagate_integration_project_worker.rb @@ -5,7 +5,6 @@ class PropagateIntegrationProjectWorker feature_category :integrations idempotent! - loggable_arguments 1 # rubocop: disable CodeReuse/ActiveRecord def perform(integration_id, min_id, max_id) -- GitLab From dffda56ad87a0612a65034772c5d288c705144b5 Mon Sep 17 00:00:00 2001 From: Arturo Herrero Date: Mon, 28 Sep 2020 14:05:45 +0100 Subject: [PATCH 9/9] Increase batch size to 10_000 update_all and insert_all are pretty efficient and can easily handle a few thousand records. --- app/services/concerns/admin/propagate_service.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/app/services/concerns/admin/propagate_service.rb b/app/services/concerns/admin/propagate_service.rb index fb53dd9727b841..065ab6f7ff9302 100644 --- a/app/services/concerns/admin/propagate_service.rb +++ b/app/services/concerns/admin/propagate_service.rb @@ -4,7 +4,7 @@ module Admin module PropagateService extend ActiveSupport::Concern - BATCH_SIZE = 100 + BATCH_SIZE = 10_000 class_methods do def propagate(integration) -- GitLab