From 423d9e3a304de37032429d8ae1c102a51fbb8d31 Mon Sep 17 00:00:00 2001 From: Matthias Kaeppler Date: Thu, 12 Dec 2019 08:25:06 +0100 Subject: [PATCH 01/12] Very early POC for bulk-insert API This is not fully functional yet. --- app/models/application_record.rb | 4 + .../database/bulk_ops/bulk_insert_support.rb | 105 ++++++++++++++++++ 2 files changed, 109 insertions(+) create mode 100644 lib/gitlab/database/bulk_ops/bulk_insert_support.rb diff --git a/app/models/application_record.rb b/app/models/application_record.rb index 0979d03f6e6e2f..cb989f6db37124 100644 --- a/app/models/application_record.rb +++ b/app/models/application_record.rb @@ -1,6 +1,10 @@ # frozen_string_literal: true +require_dependency Rails.root.join('lib/gitlab/database/bulk_ops/bulk_insert_support') + class ApplicationRecord < ActiveRecord::Base + include ::Gitlab::Database::BulkInsertSupport + self.abstract_class = true alias_method :reset, :reload diff --git a/lib/gitlab/database/bulk_ops/bulk_insert_support.rb b/lib/gitlab/database/bulk_ops/bulk_insert_support.rb new file mode 100644 index 00000000000000..e84bebee615e2f --- /dev/null +++ b/lib/gitlab/database/bulk_ops/bulk_insert_support.rb @@ -0,0 +1,105 @@ +# frozen_string_literal: true + +module Gitlab + module Database + module BulkInsertSupport + extend ActiveSupport::Concern + + AlreadyActiveError = Class.new(StandardError) + + class BulkInsertState + attr_reader :captures + + def self.set!(clazz, instance = BulkInsertState.new) + p "request bulk inserts for #{clazz}" + Thread.current[:_gl_bulk_insert_state] ||= {} + Thread.current[:_gl_bulk_insert_state][clazz] = instance + end + + def self.unset!(clazz) + set!(clazz, nil) + end + + def self.get(clazz) + Thread.current[:_gl_bulk_insert_state]&.fetch(clazz, nil) + end + + def initialize + @captures = [] + end + + def add_capture!(capture) + @captures << capture + end + + def inject_last_values!(values) + @captures.last[:set_values].call(values) + end + end + + class_methods do + def with_bulk_inserts(&block) + raise AlreadyActiveError.new("Cannot nest bulk inserts") if BulkInsertState.get(self) + + # TODO: this should probably be bound to a transaction somehow + BulkInsertState.set!(self) + + self.transaction do + yield + _gl_bulk_insert! + end + + ensure + BulkInsertState.unset!(self) + end + + def _gl_bulk_inserts_requested? + p "requested for #{self} = #{!!BulkInsertState.get(self)}" + !!BulkInsertState.get(self) + end + + # pre-conditions: `BulkInsertState` != nil && BulkInsertState.get.captures != nil + def _gl_bulk_insert! + p "// flushing delayed inserts" + + captures = BulkInsertState.get(self).captures + + values = captures.map { |c| c[:values] } + + ids = Gitlab::Database.bulk_insert(table_name, values, return_ids: true) + p "inserted IDs: #{ids.inspect}" + + if ids && ids.any? + captures.zip(ids).each { |c, id| c[:set_id].call(id) } + end + # TODO: assert ids.size == values.size + end + + # Overrides ActiveRecord::Persistence::ClassMethods._insert_record + def _insert_record(values) + if _gl_bulk_inserts_requested? + p "!! delaying row #{values} for bulk insert" + BulkInsertState.get(self).inject_last_values!(values) + else + super + end + end + end + + private + + # Overrides ActiveRecord::*._create_record + def _create_record(*) + if self.class._gl_bulk_inserts_requested? + record_capture = { + set_id: ->(id) { self.id ||= id }, + set_values: -> (values) { record_capture[:values] = values } + } + BulkInsertState.get(self.class).add_capture!(record_capture) + end + + super + end + end + end +end -- GitLab From a651e625f32247a49715bb8224f1fa419cb20a90 Mon Sep 17 00:00:00 2001 From: Matthias Kaeppler Date: Thu, 12 Dec 2019 16:33:49 +0100 Subject: [PATCH 02/12] Run after_save hooks after INSERT This is a major refactoring that unhinges after_save callbacks and replaces them after we run a bulk INSERT --- .../database/bulk_ops/bulk_insert_support.rb | 148 ++++++++++++------ 1 file changed, 102 insertions(+), 46 deletions(-) diff --git a/lib/gitlab/database/bulk_ops/bulk_insert_support.rb b/lib/gitlab/database/bulk_ops/bulk_insert_support.rb index e84bebee615e2f..1bdcb97a1ccb2b 100644 --- a/lib/gitlab/database/bulk_ops/bulk_insert_support.rb +++ b/lib/gitlab/database/bulk_ops/bulk_insert_support.rb @@ -8,82 +8,136 @@ module BulkInsertSupport AlreadyActiveError = Class.new(StandardError) class BulkInsertState - attr_reader :captures - - def self.set!(clazz, instance = BulkInsertState.new) - p "request bulk inserts for #{clazz}" - Thread.current[:_gl_bulk_insert_state] ||= {} - Thread.current[:_gl_bulk_insert_state][clazz] = instance - end - - def self.unset!(clazz) - set!(clazz, nil) - end - - def self.get(clazz) - Thread.current[:_gl_bulk_insert_state]&.fetch(clazz, nil) - end + attr_reader :captures, :callbacks def initialize @captures = [] + @callbacks = nil end - def add_capture!(capture) + def add_capture(capture) @captures << capture end def inject_last_values!(values) - @captures.last[:set_values].call(values) + @captures.last[:values] = values + end + + def store_callbacks(callbacks) + @callbacks = callbacks end end class_methods do def with_bulk_inserts(&block) - raise AlreadyActiveError.new("Cannot nest bulk inserts") if BulkInsertState.get(self) - - # TODO: this should probably be bound to a transaction somehow - BulkInsertState.set!(self) + raise AlreadyActiveError.new("Cannot nest bulk inserts") if _gl_bulk_inserts_requested? self.transaction do + p "BEGIN TRANSACTION" + + _gl_set_bulk_insert_state + _gl_delay_callbacks + yield - _gl_bulk_insert! + + _gl_bulk_insert + _gl_restore_callbacks + _gl_replay_callbacks + + p "END TRANSACTION" end ensure - BulkInsertState.unset!(self) + _gl_release_bulk_insert_state end def _gl_bulk_inserts_requested? - p "requested for #{self} = #{!!BulkInsertState.get(self)}" - !!BulkInsertState.get(self) + !!_gl_bulk_insert_state end - # pre-conditions: `BulkInsertState` != nil && BulkInsertState.get.captures != nil - def _gl_bulk_insert! - p "// flushing delayed inserts" - - captures = BulkInsertState.get(self).captures - - values = captures.map { |c| c[:values] } - - ids = Gitlab::Database.bulk_insert(table_name, values, return_ids: true) - p "inserted IDs: #{ids.inspect}" - - if ids && ids.any? - captures.zip(ids).each { |c, id| c[:set_id].call(id) } - end - # TODO: assert ids.size == values.size + def _gl_capture_record(record) + _gl_bulk_insert_state.add_capture(record) end # Overrides ActiveRecord::Persistence::ClassMethods._insert_record def _insert_record(values) if _gl_bulk_inserts_requested? p "!! delaying row #{values} for bulk insert" - BulkInsertState.get(self).inject_last_values!(values) + _gl_bulk_insert_state.inject_last_values!(values) else super end end + + private + + def _gl_bulk_insert_state + Thread.current[:_gl_bulk_insert_state]&.fetch(self.class, nil) + end + + def _gl_set_bulk_insert_state + Thread.current[:_gl_bulk_insert_state] = { + self.class => BulkInsertState.new + } + end + + def _gl_release_bulk_insert_state + Thread.current[:_gl_bulk_insert_state] = nil + end + + def _gl_delay_callbacks + # make backup of original callback chain + _gl_bulk_insert_state.store_callbacks(_save_callbacks.deep_dup) + + # remove after_save callbacks, so that they won't be invoked + _save_callbacks.send(:chain).reject! { |c| c.kind == :after } + end + + def _gl_restore_callbacks + stored_callbacks = _gl_bulk_insert_state.callbacks + + _save_callbacks = stored_callbacks + __callbacks[:save] = stored_callbacks + end + + def _gl_replay_callbacks + p "REPLAYING after_save" + _gl_bulk_insert_state.captures.each do |capture| + target = capture[:target] + _gl_run_save_callback(target, :after) + end + end + + # Because `run_callbacks` will run _all_ callbacks for e.g. `save`, + # we need to filter out anything that is not of the given `kind` first. + def _gl_run_save_callback(target, kind) + current_callbacks = _save_callbacks.deep_dup + + requested_chain = _save_callbacks.send(:chain) + requested_chain.reject! { |c| c.kind != kind } + + target.run_callbacks :save + + _save_callbacks = current_callbacks + __callbacks[:save] = current_callbacks + end + + def _gl_bulk_insert + p "BULK INSERT" + + # obtains all captured model instances & row values + captures = _gl_bulk_insert_state.captures + + values = captures.map { |c| c[:values] } + + ids = Gitlab::Database.bulk_insert(table_name, values, return_ids: true) + p "--> row IDs: #{ids.inspect}" + + # inject row IDs back into model instances + if ids && ids.any? + captures.zip(ids).each { |c, id| c[:target].id = id } + end + end end private @@ -91,11 +145,13 @@ def _insert_record(values) # Overrides ActiveRecord::*._create_record def _create_record(*) if self.class._gl_bulk_inserts_requested? - record_capture = { - set_id: ->(id) { self.id ||= id }, - set_values: -> (values) { record_capture[:values] = values } - } - BulkInsertState.get(self.class).add_capture!(record_capture) + # We need to hold on to the current instance, so that we can + # inject IDs into them later on. + # The row `values` will be injected later in `_insert_record`/ + self.class._gl_capture_record({ + target: self, + values: nil + }) end super -- GitLab From 37e5ea8e89caf6847c452deb65145ea049486499 Mon Sep 17 00:00:00 2001 From: Matthias Kaeppler Date: Mon, 16 Dec 2019 09:17:10 +0100 Subject: [PATCH 03/12] Change primary bulk insert API This change ensure that no unexpected side effects can occur --- lib/gitlab/database/bulk_ops/bulk_insert_support.rb | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/lib/gitlab/database/bulk_ops/bulk_insert_support.rb b/lib/gitlab/database/bulk_ops/bulk_insert_support.rb index 1bdcb97a1ccb2b..28874bf1713a99 100644 --- a/lib/gitlab/database/bulk_ops/bulk_insert_support.rb +++ b/lib/gitlab/database/bulk_ops/bulk_insert_support.rb @@ -29,7 +29,7 @@ def store_callbacks(callbacks) end class_methods do - def with_bulk_inserts(&block) + def save_all!(*items) raise AlreadyActiveError.new("Cannot nest bulk inserts") if _gl_bulk_inserts_requested? self.transaction do @@ -38,7 +38,11 @@ def with_bulk_inserts(&block) _gl_set_bulk_insert_state _gl_delay_callbacks - yield + items.each do |item| + raise "Wrong instance type %s, expected T < %s" % [item.class, self] unless item.is_a? self + + item.save! + end _gl_bulk_insert _gl_restore_callbacks @@ -46,7 +50,6 @@ def with_bulk_inserts(&block) p "END TRANSACTION" end - ensure _gl_release_bulk_insert_state end -- GitLab From 62f906d081021017dec88719e442aecd71ac5ec3 Mon Sep 17 00:00:00 2001 From: Matthias Kaeppler Date: Tue, 17 Dec 2019 09:44:05 +0100 Subject: [PATCH 04/12] Add test suite This also fixes numerous bugs I found in the process --- app/models/application_record.rb | 2 +- .../{bulk_ops => }/bulk_insert_support.rb | 62 +++--- .../database/bulk_insert_support_spec.rb | 193 ++++++++++++++++++ 3 files changed, 231 insertions(+), 26 deletions(-) rename lib/gitlab/database/{bulk_ops => }/bulk_insert_support.rb (64%) create mode 100644 spec/lib/gitlab/database/bulk_insert_support_spec.rb diff --git a/app/models/application_record.rb b/app/models/application_record.rb index cb989f6db37124..426bf098032f76 100644 --- a/app/models/application_record.rb +++ b/app/models/application_record.rb @@ -1,6 +1,6 @@ # frozen_string_literal: true -require_dependency Rails.root.join('lib/gitlab/database/bulk_ops/bulk_insert_support') +require_dependency Rails.root.join('lib/gitlab/database/bulk_insert_support') class ApplicationRecord < ActiveRecord::Base include ::Gitlab::Database::BulkInsertSupport diff --git a/lib/gitlab/database/bulk_ops/bulk_insert_support.rb b/lib/gitlab/database/bulk_insert_support.rb similarity index 64% rename from lib/gitlab/database/bulk_ops/bulk_insert_support.rb rename to lib/gitlab/database/bulk_insert_support.rb index 28874bf1713a99..913ff4c35282b0 100644 --- a/lib/gitlab/database/bulk_ops/bulk_insert_support.rb +++ b/lib/gitlab/database/bulk_insert_support.rb @@ -5,7 +5,8 @@ module Database module BulkInsertSupport extend ActiveSupport::Concern - AlreadyActiveError = Class.new(StandardError) + NestedCallError = Class.new(StandardError) + TargetTypeError = Class.new(StandardError) class BulkInsertState attr_reader :captures, :callbacks @@ -24,22 +25,26 @@ def inject_last_values!(values) end def store_callbacks(callbacks) - @callbacks = callbacks + @callbacks = callbacks.deep_dup + end + + def callbacks_for(name) + @callbacks[name].deep_dup end end class_methods do def save_all!(*items) - raise AlreadyActiveError.new("Cannot nest bulk inserts") if _gl_bulk_inserts_requested? + raise NestedCallError.new("Cannot nest bulk inserts") if _gl_bulk_inserts_requested? self.transaction do - p "BEGIN TRANSACTION" - _gl_set_bulk_insert_state _gl_delay_callbacks - items.each do |item| - raise "Wrong instance type %s, expected T < %s" % [item.class, self] unless item.is_a? self + items.flatten.each do |item| + unless item.is_a?(self) + raise TargetTypeError.new("Wrong instance type %s, expected T <= %s" % [item.class, self]) + end item.save! end @@ -47,10 +52,11 @@ def save_all!(*items) _gl_bulk_insert _gl_restore_callbacks _gl_replay_callbacks - - p "END TRANSACTION" end ensure + # might be called redundantly, but if we terminated abnormally anywhere + # due to an exception, we must restore the class' callback structure + _gl_restore_callbacks _gl_release_bulk_insert_state end @@ -65,7 +71,6 @@ def _gl_capture_record(record) # Overrides ActiveRecord::Persistence::ClassMethods._insert_record def _insert_record(values) if _gl_bulk_inserts_requested? - p "!! delaying row #{values} for bulk insert" _gl_bulk_insert_state.inject_last_values!(values) else super @@ -75,12 +80,12 @@ def _insert_record(values) private def _gl_bulk_insert_state - Thread.current[:_gl_bulk_insert_state]&.fetch(self.class, nil) + Thread.current[:_gl_bulk_insert_state]&.fetch(self, nil) end def _gl_set_bulk_insert_state Thread.current[:_gl_bulk_insert_state] = { - self.class => BulkInsertState.new + self => BulkInsertState.new } end @@ -90,21 +95,24 @@ def _gl_release_bulk_insert_state def _gl_delay_callbacks # make backup of original callback chain - _gl_bulk_insert_state.store_callbacks(_save_callbacks.deep_dup) + _gl_bulk_insert_state.store_callbacks(__callbacks) # remove after_save callbacks, so that they won't be invoked - _save_callbacks.send(:chain).reject! { |c| c.kind == :after } + _save_callbacks.__send__(:chain).reject! { |c| c.kind == :after } end def _gl_restore_callbacks + return unless _gl_bulk_insert_state + stored_callbacks = _gl_bulk_insert_state.callbacks - _save_callbacks = stored_callbacks - __callbacks[:save] = stored_callbacks + if stored_callbacks&.any? + _save_callbacks = stored_callbacks[:save] + __callbacks[:save] = stored_callbacks[:save] + end end def _gl_replay_callbacks - p "REPLAYING after_save" _gl_bulk_insert_state.captures.each do |capture| target = capture[:target] _gl_run_save_callback(target, :after) @@ -113,28 +121,32 @@ def _gl_replay_callbacks # Because `run_callbacks` will run _all_ callbacks for e.g. `save`, # we need to filter out anything that is not of the given `kind` first. + # + # TODO: this is currently hacky and inefficient, since it involves a lot + # of copying of callback structures. This only needs to happen once per + # replay, not for every item. def _gl_run_save_callback(target, kind) - current_callbacks = _save_callbacks.deep_dup + # obtain a copy of the stored/original save callbacks + requested_callbacks = _gl_bulk_insert_state.callbacks_for(:save) - requested_chain = _save_callbacks.send(:chain) - requested_chain.reject! { |c| c.kind != kind } + # retain only callback hooks of the requested kind + requested_callbacks.__send__(:chain).reject! { |c| c.kind != kind } + _save_callbacks = requested_callbacks + __callbacks[:save] = requested_callbacks target.run_callbacks :save - _save_callbacks = current_callbacks - __callbacks[:save] = current_callbacks + # restore callbacks to previous state + _gl_restore_callbacks end def _gl_bulk_insert - p "BULK INSERT" - # obtains all captured model instances & row values captures = _gl_bulk_insert_state.captures values = captures.map { |c| c[:values] } ids = Gitlab::Database.bulk_insert(table_name, values, return_ids: true) - p "--> row IDs: #{ids.inspect}" # inject row IDs back into model instances if ids && ids.any? diff --git a/spec/lib/gitlab/database/bulk_insert_support_spec.rb b/spec/lib/gitlab/database/bulk_insert_support_spec.rb new file mode 100644 index 00000000000000..4e7acdb5b3f93a --- /dev/null +++ b/spec/lib/gitlab/database/bulk_insert_support_spec.rb @@ -0,0 +1,193 @@ +# frozen_string_literal: true + +require 'spec_helper' + +describe Gitlab::Database::BulkInsertSupport do + class ItemDependency < ApplicationRecord + belongs_to :bulk_insert_item + end + + class BulkInsertItem < ApplicationRecord + attr_reader :after, :before_save_count, :after_save_count + attr_writer :fail_before_save + attr_writer :fail_after_save + + after_initialize do + @before_save_count = 0 + @after_save_count = 0 + end + + has_one :item_dependency + + before_save -> { + @before_save_count += 1 + + raise "failed in before_save" if @fail_before_save + + self.before = "#{name} set from before_save" + } + + after_save -> { + @after_save_count += 1 + + raise "failed in after_save" if @fail_after_save + + @after = "#{name} set from after_save" + } + end + + before(:all) do + ActiveRecord::Schema.define do + create_table :bulk_insert_items, force: true do |t| + t.string :name, null: false + t.string :before, null: true + end + + create_table :item_dependencies, force: true do |t| + t.string :name + t.belongs_to :bulk_insert_item, null: true + end + end + end + + after(:all) do + ActiveRecord::Schema.define do + drop_table :item_dependencies, force: true + drop_table :bulk_insert_items, force: true + end + end + + describe 'save_all!' do + let(:good_item) { new_item } + let(:bad_item) { new_item(name: nil) } + + it 'inserts all items from list' do + items = Array.new(2) { new_item } + + expect do + BulkInsertItem.save_all!(items) + end.to change { BulkInsertItem.count }.from(0).to(2) + + BulkInsertItem.all do |item| + expect(item.id).not_to be_nil + expect(item.name).to eq("item") + end + end + + it 'inserts all items from varargs' do + expect do + BulkInsertItem.save_all!(new_item(name: "one"), new_item(name: "two")) + end.to change { BulkInsertItem.count }.from(0).to(2) + + inserted = BulkInsertItem.all + expect(inserted.size).to eq(2) + expect(inserted.first.id).not_to be_nil + expect(inserted.first.name).to eq('one') + expect(inserted.last.id).not_to be_nil + expect(inserted.last.name).to eq('two') + end + + context 'before_save action' do + it 'runs once before INSERT' do + item1 = new_item(name: "one") + item2 = new_item(name: "two") + + BulkInsertItem.save_all!(item1, item2) + + expect(item1.before_save_count).to eq(1) + expect(item2.before_save_count).to eq(1) + + expect(item1.before).to eq("one set from before_save") + expect(item2.before).to eq("two set from before_save") + + # these changes must survive a reload from DB, or they wouldn't have been persisted + expect(item1.reload.before).to eq("one set from before_save") + expect(item2.reload.before).to eq("two set from before_save") + end + + it 'rolls back INSERT when failed' do + item1 = new_item(name: "one") + item2 = new_item(name: "two") + item2.fail_before_save = true + + expect do + BulkInsertItem.save_all!(item1, item2) + rescue + end.not_to change { BulkInsertItem.count } + end + end + + context 'after_save action' do + it 'runs once after INSERT' do + item1 = new_item(name: "one") + item2 = new_item(name: "two") + + BulkInsertItem.save_all!(item1, item2) + + expect(item1.after_save_count).to eq(1) + expect(item2.after_save_count).to eq(1) + + expect(item1.after).to eq("one set from after_save") + expect(item2.after).to eq("two set from after_save") + end + + it 'rolls back INSERT when failed' do + item1 = new_item(name: "one") + item2 = new_item(name: "two") + item2.fail_after_save = true + + expect do + BulkInsertItem.save_all!(item1, item2) + rescue + end.not_to change { BulkInsertItem.count } + end + end + + it 'throws when some or all items are not of the specified type' do + expect { BulkInsertItem.save_all!(good_item, "not a `BulkInsertItem`") }.to( + raise_error(Gitlab::Database::BulkInsertSupport::TargetTypeError) + ) + end + + context 'when calls are nested' do + class WithIllegalNestedCall < BulkInsertItem + after_save -> { WithIllegalNestedCall.save_all!(self) } + end + + class WithLegalNestedCall < BulkInsertItem + after_save -> { ItemDependency.save_all!(ItemDependency.new(name: 'ok')) } + end + + it 'throws when called on the same class' do + expect { WithIllegalNestedCall.save_all!(WithIllegalNestedCall.new(name: 'not ok')) }.to( + raise_error(Gitlab::Database::BulkInsertSupport::NestedCallError) + ) + end + + it 'allows calls for other types' do + expect { WithLegalNestedCall.save_all!(WithLegalNestedCall.new(name: 'ok')) }.to( + change { ItemDependency.count }.from(0).to(1) + ) + end + end + + context 'when failures are present' do + it 'propagates failures' do + expect { BulkInsertItem.save_all!(bad_item) }.to raise_error(ActiveRecord::NotNullViolation) + end + + it 'rolls back all changes' do + expect do + BulkInsertItem.save_all!(good_item, bad_item) + rescue + end.not_to change { BulkInsertItem.count } + end + end + end + + private + + def new_item(name: 'item', dep: nil) + BulkInsertItem.new(name: name, item_dependency: dep) + end +end -- GitLab From cdd00ba0675ca55af29a237cf842c637b830070b Mon Sep 17 00:00:00 2001 From: Matthias Kaeppler Date: Tue, 17 Dec 2019 11:53:20 +0100 Subject: [PATCH 05/12] Implement support for create callbacks - before_create - after_create And clean up impl a bit --- lib/gitlab/database/bulk_insert_support.rb | 50 ++++++-- .../database/bulk_insert_support_spec.rb | 113 +++++++++++++++--- 2 files changed, 133 insertions(+), 30 deletions(-) diff --git a/lib/gitlab/database/bulk_insert_support.rb b/lib/gitlab/database/bulk_insert_support.rb index 913ff4c35282b0..ef63216851dd33 100644 --- a/lib/gitlab/database/bulk_insert_support.rb +++ b/lib/gitlab/database/bulk_insert_support.rb @@ -8,6 +8,8 @@ module BulkInsertSupport NestedCallError = Class.new(StandardError) TargetTypeError = Class.new(StandardError) + DELAYED_CALLBACKS = [:save, :create].freeze + class BulkInsertState attr_reader :captures, :callbacks @@ -33,13 +35,15 @@ def callbacks_for(name) end end + # We currently reach into internal APIs, which requires __send__ + # rubocop: disable GitlabSecurity/PublicSend class_methods do def save_all!(*items) raise NestedCallError.new("Cannot nest bulk inserts") if _gl_bulk_inserts_requested? self.transaction do _gl_set_bulk_insert_state - _gl_delay_callbacks + _gl_delay_after_callbacks items.flatten.each do |item| unless item.is_a?(self) @@ -93,12 +97,14 @@ def _gl_release_bulk_insert_state Thread.current[:_gl_bulk_insert_state] = nil end - def _gl_delay_callbacks + def _gl_delay_after_callbacks # make backup of original callback chain _gl_bulk_insert_state.store_callbacks(__callbacks) - # remove after_save callbacks, so that they won't be invoked - _save_callbacks.__send__(:chain).reject! { |c| c.kind == :after } + # remove after_* callbacks, so that they won't be invoked + DELAYED_CALLBACKS.each do |cb| + _gl_suppress_callbacks(_gl_get_callback_chain(cb)) { |cb| cb.kind == :after } + end end def _gl_restore_callbacks @@ -107,15 +113,17 @@ def _gl_restore_callbacks stored_callbacks = _gl_bulk_insert_state.callbacks if stored_callbacks&.any? - _save_callbacks = stored_callbacks[:save] - __callbacks[:save] = stored_callbacks[:save] + DELAYED_CALLBACKS.each do |cb| + _gl_set_callback_chain(stored_callbacks[cb], cb) + end end end def _gl_replay_callbacks _gl_bulk_insert_state.captures.each do |capture| target = capture[:target] - _gl_run_save_callback(target, :after) + _gl_run_callback(target, :after, :create) + _gl_run_callback(target, :after, :save) end end @@ -125,21 +133,37 @@ def _gl_replay_callbacks # TODO: this is currently hacky and inefficient, since it involves a lot # of copying of callback structures. This only needs to happen once per # replay, not for every item. - def _gl_run_save_callback(target, kind) + def _gl_run_callback(target, kind, name) # obtain a copy of the stored/original save callbacks - requested_callbacks = _gl_bulk_insert_state.callbacks_for(:save) + requested_callbacks = _gl_bulk_insert_state.callbacks_for(name) # retain only callback hooks of the requested kind - requested_callbacks.__send__(:chain).reject! { |c| c.kind != kind } - _save_callbacks = requested_callbacks - __callbacks[:save] = requested_callbacks + _gl_suppress_callbacks(requested_callbacks) { |cb| cb.kind != kind } + _gl_set_callback_chain(requested_callbacks, name) - target.run_callbacks :save + target.run_callbacks name # restore callbacks to previous state _gl_restore_callbacks end + def _gl_get_callback_chain(name) + __send__("_#{name}_callbacks") + end + + # `callback_chain`: ActiveSupport::Callbacks::CallbackChain + # `name`: :save, :create, ... + def _gl_set_callback_chain(callback_chain, name) + __send__("_#{name}_callbacks=", callback_chain) + __callbacks[name] = callback_chain + end + + # `callback_chain`: ActiveSupport::Callbacks::CallbackChain + # `kind`: :before, :after, ... + def _gl_suppress_callbacks(callback_chain, &block) + callback_chain.__send__(:chain).reject!(&block) + end + def _gl_bulk_insert # obtains all captured model instances & row values captures = _gl_bulk_insert_state.captures diff --git a/spec/lib/gitlab/database/bulk_insert_support_spec.rb b/spec/lib/gitlab/database/bulk_insert_support_spec.rb index 4e7acdb5b3f93a..0e157bbaacdfa0 100644 --- a/spec/lib/gitlab/database/bulk_insert_support_spec.rb +++ b/spec/lib/gitlab/database/bulk_insert_support_spec.rb @@ -8,32 +8,64 @@ class ItemDependency < ApplicationRecord end class BulkInsertItem < ApplicationRecord - attr_reader :after, :before_save_count, :after_save_count - attr_writer :fail_before_save - attr_writer :fail_after_save + attr_reader :after, :callback_invocations + attr_writer :fail_before_save, :fail_after_save, :fail_after_commit after_initialize do - @before_save_count = 0 - @after_save_count = 0 + @callback_invocations = { + before_save: 0, + before_create: 0, + after_create: 0, + after_save: 0, + after_commit: 0, + sequence: [] + } end has_one :item_dependency + after_commit -> { + @callback_invocations[:after_commit] += 1 + @callback_invocations[:sequence] << :after_commit + + raise "`id` must be set in after_commit" unless self.id + } + + before_create -> { + @callback_invocations[:before_create] += 1 + @callback_invocations[:sequence] << :before_create + } + before_save -> { - @before_save_count += 1 + @callback_invocations[:before_save] += 1 + @callback_invocations[:sequence] << :before_save raise "failed in before_save" if @fail_before_save + # create something within the outer transaction so + # we can test rollbacks + ItemDependency.create!(name: 'before_save') + self.before = "#{name} set from before_save" } after_save -> { - @after_save_count += 1 + @callback_invocations[:after_save] += 1 + @callback_invocations[:sequence] << :after_save + raise "`id` must be set in after_save" unless self.id raise "failed in after_save" if @fail_after_save @after = "#{name} set from after_save" } + + after_create -> { + @callback_invocations[:after_create] += 1 + @callback_invocations[:sequence] << :after_create + raise "`id` must be set in after_create" unless self.id + + raise "failed in after_commit" if @fail_after_commit + } end before(:all) do @@ -87,15 +119,29 @@ class BulkInsertItem < ApplicationRecord expect(inserted.last.name).to eq('two') end + it 'maintains correct AR callback order' do + items = [new_item(name: "one"), new_item(name: "two")] + + BulkInsertItem.save_all!(items) + + expect(items.map { |i| i.callback_invocations[:sequence] }).to all(eq([ + :before_save, + :before_create, + :after_create, + :after_save, + :after_commit + ])) + end + context 'before_save action' do - it 'runs once before INSERT' do + it 'runs once for each item before INSERT' do item1 = new_item(name: "one") item2 = new_item(name: "two") BulkInsertItem.save_all!(item1, item2) - expect(item1.before_save_count).to eq(1) - expect(item2.before_save_count).to eq(1) + expect(item1.callback_invocations[:before_save]).to eq(1) + expect(item2.callback_invocations[:before_save]).to eq(1) expect(item1.before).to eq("one set from before_save") expect(item2.before).to eq("two set from before_save") @@ -105,7 +151,7 @@ class BulkInsertItem < ApplicationRecord expect(item2.reload.before).to eq("two set from before_save") end - it 'rolls back INSERT when failed' do + it 'rolls back changes when failed' do item1 = new_item(name: "one") item2 = new_item(name: "two") item2.fail_before_save = true @@ -113,25 +159,35 @@ class BulkInsertItem < ApplicationRecord expect do BulkInsertItem.save_all!(item1, item2) rescue - end.not_to change { BulkInsertItem.count } + end.not_to change { ItemDependency.count } + end + end + + context 'before_create action' do + it 'runs once for each item before INSERT' do + items = [new_item(name: "one"), new_item(name: "two")] + + BulkInsertItem.save_all!(items) + + expect(items.map { |i| i.callback_invocations[:before_create] }).to all(eq(1)) end end context 'after_save action' do - it 'runs once after INSERT' do + it 'runs once for each item after INSERT' do item1 = new_item(name: "one") item2 = new_item(name: "two") BulkInsertItem.save_all!(item1, item2) - expect(item1.after_save_count).to eq(1) - expect(item2.after_save_count).to eq(1) + expect(item1.callback_invocations[:before_save]).to eq(1) + expect(item2.callback_invocations[:after_save]).to eq(1) expect(item1.after).to eq("one set from after_save") expect(item2.after).to eq("two set from after_save") end - it 'rolls back INSERT when failed' do + it 'rolls back changes when failed' do item1 = new_item(name: "one") item2 = new_item(name: "two") item2.fail_after_save = true @@ -140,6 +196,29 @@ class BulkInsertItem < ApplicationRecord BulkInsertItem.save_all!(item1, item2) rescue end.not_to change { BulkInsertItem.count } + expect(ItemDependency.count).to eq(0) + end + end + + context 'after_create action' do + it 'runs once for each item before INSERT' do + items = [new_item(name: "one"), new_item(name: "two")] + + BulkInsertItem.save_all!(items) + + expect(items.map { |i| i.callback_invocations[:after_create] }).to all(eq(1)) + end + + it 'rolls back changes when failed' do + item1 = new_item(name: "one") + item2 = new_item(name: "two") + item2.fail_after_commit = true + + expect do + BulkInsertItem.save_all!(item1, item2) + rescue + end.not_to change { BulkInsertItem.count } + expect(ItemDependency.count).to eq(0) end end @@ -166,7 +245,7 @@ class WithLegalNestedCall < BulkInsertItem it 'allows calls for other types' do expect { WithLegalNestedCall.save_all!(WithLegalNestedCall.new(name: 'ok')) }.to( - change { ItemDependency.count }.from(0).to(1) + change { ItemDependency.count }.from(0).to(2) # count the extra insertion from before_save ) end end -- GitLab From a623139ab266adf73b0e40519979a09ec8507a23 Mon Sep 17 00:00:00 2001 From: Matthias Kaeppler Date: Tue, 17 Dec 2019 15:33:57 +0100 Subject: [PATCH 06/12] Use a mutex for now to ensure thread safety --- lib/gitlab/database/bulk_insert_support.rb | 41 +++++++++++++--------- 1 file changed, 25 insertions(+), 16 deletions(-) diff --git a/lib/gitlab/database/bulk_insert_support.rb b/lib/gitlab/database/bulk_insert_support.rb index ef63216851dd33..59194a64d38a5f 100644 --- a/lib/gitlab/database/bulk_insert_support.rb +++ b/lib/gitlab/database/bulk_insert_support.rb @@ -41,27 +41,36 @@ def callbacks_for(name) def save_all!(*items) raise NestedCallError.new("Cannot nest bulk inserts") if _gl_bulk_inserts_requested? - self.transaction do - _gl_set_bulk_insert_state - _gl_delay_after_callbacks - - items.flatten.each do |item| - unless item.is_a?(self) - raise TargetTypeError.new("Wrong instance type %s, expected T <= %s" % [item.class, self]) + # Until we find a better way to delay AR callbacks than rewriting them, + # we need to protect the entire transaction from other threads. Since + # callbacks are class-level instances, they might otherwise interfere + # with each other. + # This essentially means that bulk inserts are entirely sequential even + # in multi-threaded setups. + @_gl_bulk_insert_lock ||= Mutex.new + @_gl_bulk_insert_lock.synchronize do + self.transaction do + _gl_set_bulk_insert_state + _gl_delay_after_callbacks + + items.flatten.each do |item| + unless item.is_a?(self) + raise TargetTypeError.new("Wrong instance type %s, expected T <= %s" % [item.class, self]) + end + + item.save! end - item.save! + _gl_bulk_insert + _gl_restore_callbacks + _gl_replay_callbacks end - - _gl_bulk_insert + ensure + # might be called redundantly, but if we terminated abnormally anywhere + # due to an exception, we must restore the class' callback structure _gl_restore_callbacks - _gl_replay_callbacks + _gl_release_bulk_insert_state end - ensure - # might be called redundantly, but if we terminated abnormally anywhere - # due to an exception, we must restore the class' callback structure - _gl_restore_callbacks - _gl_release_bulk_insert_state end def _gl_bulk_inserts_requested? -- GitLab From 1568e988ce6ff09d6f3f949a10e6fef1eebba108 Mon Sep 17 00:00:00 2001 From: Matthias Kaeppler Date: Tue, 17 Dec 2019 15:45:18 +0100 Subject: [PATCH 07/12] Attempting to fix rubocop This was only breaking on CI :shrug: --- .../database/bulk_insert_support_spec.rb | 26 +++++++++---------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/spec/lib/gitlab/database/bulk_insert_support_spec.rb b/spec/lib/gitlab/database/bulk_insert_support_spec.rb index 0e157bbaacdfa0..053a8749b452fe 100644 --- a/spec/lib/gitlab/database/bulk_insert_support_spec.rb +++ b/spec/lib/gitlab/database/bulk_insert_support_spec.rb @@ -24,19 +24,19 @@ class BulkInsertItem < ApplicationRecord has_one :item_dependency - after_commit -> { + after_commit do @callback_invocations[:after_commit] += 1 @callback_invocations[:sequence] << :after_commit - raise "`id` must be set in after_commit" unless self.id - } + raise "'id' must be set in after_commit" unless self.id + end - before_create -> { + before_create do @callback_invocations[:before_create] += 1 @callback_invocations[:sequence] << :before_create - } + end - before_save -> { + before_save do @callback_invocations[:before_save] += 1 @callback_invocations[:sequence] << :before_save @@ -47,25 +47,25 @@ class BulkInsertItem < ApplicationRecord ItemDependency.create!(name: 'before_save') self.before = "#{name} set from before_save" - } + end - after_save -> { + after_save do @callback_invocations[:after_save] += 1 @callback_invocations[:sequence] << :after_save - raise "`id` must be set in after_save" unless self.id + raise "'id' must be set in after_save" unless self.id raise "failed in after_save" if @fail_after_save @after = "#{name} set from after_save" - } + end - after_create -> { + after_create do @callback_invocations[:after_create] += 1 @callback_invocations[:sequence] << :after_create - raise "`id` must be set in after_create" unless self.id + raise "'id' must be set in after_create" unless self.id raise "failed in after_commit" if @fail_after_commit - } + end end before(:all) do -- GitLab From 64cf9ea558342fec8d68396e06d9b7971b52f741 Mon Sep 17 00:00:00 2001 From: Matthias Kaeppler Date: Tue, 17 Dec 2019 16:41:48 +0100 Subject: [PATCH 08/12] Add some tests for concurrent scenarios --- .../database/bulk_insert_support_spec.rb | 39 +++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/spec/lib/gitlab/database/bulk_insert_support_spec.rb b/spec/lib/gitlab/database/bulk_insert_support_spec.rb index 053a8749b452fe..d3c8e30f54a1c5 100644 --- a/spec/lib/gitlab/database/bulk_insert_support_spec.rb +++ b/spec/lib/gitlab/database/bulk_insert_support_spec.rb @@ -222,6 +222,41 @@ class BulkInsertItem < ApplicationRecord end end + context 'with concurrency' do + let(:num_threads) { 10 } + + it 'works when called from multiple threads for same class' do + threads = Array.new(num_threads) { Thread.new { + new_item.tap { |i| BulkInsertItem.save_all!(i) } + }} + + saved_items = threads.map(&:value) + + expect(BulkInsertItem.count).to eq(num_threads) + expect(saved_items.map { |i| i.callback_invocations[:sequence] }).to all(eq([ + :before_save, + :before_create, + :after_create, + :after_save, + :after_commit + ])) + end + + it 'works when called from multiple threads for different classes' do + threads = [ + Thread.new { BulkInsertItem.save_all!(new_item) }, + Thread.new { ItemDependency.save_all!(new_item_dep) }, + Thread.new { BulkInsertItem.save_all!(new_item) }, + Thread.new { ItemDependency.save_all!(new_item_dep) } + ] + + threads.each(&:join) + + expect(BulkInsertItem.count).to eq(2) + expect(ItemDependency.count).to eq(2 + 2) # also created from before_save + end + end + it 'throws when some or all items are not of the specified type' do expect { BulkInsertItem.save_all!(good_item, "not a `BulkInsertItem`") }.to( raise_error(Gitlab::Database::BulkInsertSupport::TargetTypeError) @@ -269,4 +304,8 @@ class WithLegalNestedCall < BulkInsertItem def new_item(name: 'item', dep: nil) BulkInsertItem.new(name: name, item_dependency: dep) end + + def new_item_dep(name: 'item_dep') + ItemDependency.new(name: name) + end end -- GitLab From 9350cb1f8bab93dbfd29443e441ea00eb373a55c Mon Sep 17 00:00:00 2001 From: Matthias Kaeppler Date: Wed, 18 Dec 2019 08:31:32 +0100 Subject: [PATCH 09/12] Add support for batching Even with bulk inserts, we must ensure that we do not insert overly large batches to not blow statement size or risk connection timeouts. --- lib/gitlab/database/bulk_insert_support.rb | 26 ++++++++++--------- .../database/bulk_insert_support_spec.rb | 20 ++++++++++++++ 2 files changed, 34 insertions(+), 12 deletions(-) diff --git a/lib/gitlab/database/bulk_insert_support.rb b/lib/gitlab/database/bulk_insert_support.rb index 59194a64d38a5f..eb4c2aed54ae8d 100644 --- a/lib/gitlab/database/bulk_insert_support.rb +++ b/lib/gitlab/database/bulk_insert_support.rb @@ -38,7 +38,7 @@ def callbacks_for(name) # We currently reach into internal APIs, which requires __send__ # rubocop: disable GitlabSecurity/PublicSend class_methods do - def save_all!(*items) + def save_all!(*items, batch_size: 100) raise NestedCallError.new("Cannot nest bulk inserts") if _gl_bulk_inserts_requested? # Until we find a better way to delay AR callbacks than rewriting them, @@ -49,21 +49,23 @@ def save_all!(*items) # in multi-threaded setups. @_gl_bulk_insert_lock ||= Mutex.new @_gl_bulk_insert_lock.synchronize do - self.transaction do - _gl_set_bulk_insert_state - _gl_delay_after_callbacks + items.flatten.each_slice(batch_size) do |item_batch| + self.transaction do + _gl_set_bulk_insert_state + _gl_delay_after_callbacks - items.flatten.each do |item| - unless item.is_a?(self) - raise TargetTypeError.new("Wrong instance type %s, expected T <= %s" % [item.class, self]) + item_batch.each do |item| + unless item.is_a?(self) + raise TargetTypeError.new("Wrong instance type %s, expected T <= %s" % [item.class, self]) + end + + item.save! end - item.save! + _gl_bulk_insert + _gl_restore_callbacks + _gl_replay_callbacks end - - _gl_bulk_insert - _gl_restore_callbacks - _gl_replay_callbacks end ensure # might be called redundantly, but if we terminated abnormally anywhere diff --git a/spec/lib/gitlab/database/bulk_insert_support_spec.rb b/spec/lib/gitlab/database/bulk_insert_support_spec.rb index d3c8e30f54a1c5..f2fe03525c5916 100644 --- a/spec/lib/gitlab/database/bulk_insert_support_spec.rb +++ b/spec/lib/gitlab/database/bulk_insert_support_spec.rb @@ -257,6 +257,26 @@ class BulkInsertItem < ApplicationRecord end end + context 'with batch size' do + it 'performs bulk insert for each batch' do + allow(Gitlab::Database).to receive(:bulk_insert).and_call_original + items = Array.new(5) { |n| new_item(name: "item#{n}") } + item_values = items.map do |i| + { "before": "#{i.name} set from before_save", "name": i.name }.stringify_keys + end + + BulkInsertItem.save_all!(items, batch_size: 2) + + # should produce 3 INSERTs: [one, two], [three, four], [five] + expect(Gitlab::Database).to have_received(:bulk_insert) + .with('bulk_insert_items', item_values[0..1], return_ids: true) + expect(Gitlab::Database).to have_received(:bulk_insert) + .with('bulk_insert_items', item_values[2..3], return_ids: true) + expect(Gitlab::Database).to have_received(:bulk_insert) + .with('bulk_insert_items', item_values[4..-1], return_ids: true) + end + end + it 'throws when some or all items are not of the specified type' do expect { BulkInsertItem.save_all!(good_item, "not a `BulkInsertItem`") }.to( raise_error(Gitlab::Database::BulkInsertSupport::TargetTypeError) -- GitLab From f6a4b5838e582d0f88802b8ffc457341eaf35c08 Mon Sep 17 00:00:00 2001 From: Matthias Kaeppler Date: Wed, 18 Dec 2019 08:31:47 +0100 Subject: [PATCH 10/12] Fix rubocop offenses --- spec/lib/gitlab/database/bulk_insert_support_spec.rb | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/spec/lib/gitlab/database/bulk_insert_support_spec.rb b/spec/lib/gitlab/database/bulk_insert_support_spec.rb index f2fe03525c5916..1760c38bc60c51 100644 --- a/spec/lib/gitlab/database/bulk_insert_support_spec.rb +++ b/spec/lib/gitlab/database/bulk_insert_support_spec.rb @@ -226,10 +226,10 @@ class BulkInsertItem < ApplicationRecord let(:num_threads) { 10 } it 'works when called from multiple threads for same class' do - threads = Array.new(num_threads) { Thread.new { - new_item.tap { |i| BulkInsertItem.save_all!(i) } - }} - + threads = Array.new(num_threads) do + Thread.new { new_item.tap { |i| BulkInsertItem.save_all!(i) } } + end + saved_items = threads.map(&:value) expect(BulkInsertItem.count).to eq(num_threads) @@ -241,7 +241,7 @@ class BulkInsertItem < ApplicationRecord :after_commit ])) end - + it 'works when called from multiple threads for different classes' do threads = [ Thread.new { BulkInsertItem.save_all!(new_item) }, @@ -249,7 +249,7 @@ class BulkInsertItem < ApplicationRecord Thread.new { BulkInsertItem.save_all!(new_item) }, Thread.new { ItemDependency.save_all!(new_item_dep) } ] - + threads.each(&:join) expect(BulkInsertItem.count).to eq(2) -- GitLab From 3b2df1ceaf4fc3ac2625ccc9092c24d604958250 Mon Sep 17 00:00:00 2001 From: Matthias Kaeppler Date: Wed, 18 Dec 2019 08:35:47 +0100 Subject: [PATCH 11/12] Fix wrong action tested in spec --- spec/lib/gitlab/database/bulk_insert_support_spec.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spec/lib/gitlab/database/bulk_insert_support_spec.rb b/spec/lib/gitlab/database/bulk_insert_support_spec.rb index 1760c38bc60c51..65fc1068d5d705 100644 --- a/spec/lib/gitlab/database/bulk_insert_support_spec.rb +++ b/spec/lib/gitlab/database/bulk_insert_support_spec.rb @@ -180,7 +180,7 @@ class BulkInsertItem < ApplicationRecord BulkInsertItem.save_all!(item1, item2) - expect(item1.callback_invocations[:before_save]).to eq(1) + expect(item1.callback_invocations[:after_save]).to eq(1) expect(item2.callback_invocations[:after_save]).to eq(1) expect(item1.after).to eq("one set from after_save") -- GitLab From c1f950d25de75fa6c470f5a64bed6f860956db1f Mon Sep 17 00:00:00 2001 From: Matthias Kaeppler Date: Thu, 19 Dec 2019 17:38:25 +0100 Subject: [PATCH 12/12] Backport insert_all from Rails 6 This replaces our internal bulk_insert call with the new feature from ActiveRecord 6. --- .../insert_all.rb | 220 ++++++++++++++++++ lib/gitlab/database/bulk_insert_support.rb | 14 +- .../database/bulk_insert_support_spec.rb | 23 +- 3 files changed, 246 insertions(+), 11 deletions(-) create mode 100644 lib/gitlab/database/active_record_insert_all_backport/insert_all.rb diff --git a/lib/gitlab/database/active_record_insert_all_backport/insert_all.rb b/lib/gitlab/database/active_record_insert_all_backport/insert_all.rb new file mode 100644 index 00000000000000..f908b2dd153a09 --- /dev/null +++ b/lib/gitlab/database/active_record_insert_all_backport/insert_all.rb @@ -0,0 +1,220 @@ +# frozen_string_literal: true + +# Adds Rails 6 `InsertAll` feature. +# Can be removed once we're on Rails 6. +module ActiveRecord + module ConnectionAdapters + class PostgreSQLAdapter < AbstractAdapter + def build_insert_sql(insert) # :nodoc: + sql = +"INSERT #{insert.into} #{insert.values_list}" + + if insert.skip_duplicates? + sql << " ON CONFLICT #{insert.conflict_target} DO NOTHING" + elsif insert.update_duplicates? + sql << " ON CONFLICT #{insert.conflict_target} DO UPDATE SET " + sql << insert.updatable_columns.map { |column| "#{column}=excluded.#{column}" }.join(",") + end + + sql << " RETURNING #{insert.returning}" if insert.returning + sql + end + + def supports_insert_returning? + true + end + + # Fixture value is quoted by Arel, however scalar values + # are not quotable. In this case we want to convert + # the column value to YAML. + def with_yaml_fallback(value) # :nodoc: + if value.is_a?(Hash) || value.is_a?(Array) + YAML.dump(value) + else + value + end + end + end + end + + module Persistence + module ClassMethods + def insert_all!(attributes, returning: nil) + InsertAll.new(self, attributes, on_duplicate: :raise, returning: returning).execute + end + end + end + + class InsertAll # :nodoc: + attr_reader :model, :connection, :inserts, :keys + attr_reader :on_duplicate, :returning, :unique_by + + def initialize(model, inserts, on_duplicate:, returning: nil, unique_by: nil) + raise ArgumentError, "Empty list of attributes passed" if inserts.blank? + + @model, @connection, @inserts, @keys = model, model.connection, inserts, inserts.first.keys.map(&:to_s).to_set + @on_duplicate, @returning, @unique_by = on_duplicate, returning, unique_by + + @returning = (connection.supports_insert_returning? ? primary_keys : false) if @returning.nil? + @returning = false if @returning == [] + + @unique_by = find_unique_index_for(unique_by) if unique_by + @on_duplicate = :skip if @on_duplicate == :update && updatable_columns.empty? + + ensure_valid_options_for_connection! + end + + def execute + message = +"#{model} " + message << "Bulk " if inserts.many? + message << (on_duplicate == :update ? "Upsert" : "Insert") + connection.exec_query to_sql, message + end + + def updatable_columns + keys - readonly_columns - unique_by_columns + end + + def primary_keys + Array(model.primary_key) + end + + def skip_duplicates? + on_duplicate == :skip + end + + def update_duplicates? + on_duplicate == :update + end + + def map_key_with_value + inserts.map do |attributes| + attributes = attributes.stringify_keys + verify_attributes(attributes) + + keys.map do |key| + yield key, attributes[key] + end + end + end + + private + + def find_unique_index_for(unique_by) + match = Array(unique_by).map(&:to_s) + + if index = unique_indexes.find { |i| match.include?(i.name) || i.columns == match } + index + else + raise ArgumentError, "No unique index found for #{unique_by}" + end + end + + def unique_indexes + connection.schema_cache.indexes(model.table_name).select(&:unique) + end + + def ensure_valid_options_for_connection! + if returning && !connection.supports_insert_returning? + raise ArgumentError, "#{connection.class} does not support :returning" + end + + if skip_duplicates? && !connection.supports_insert_on_duplicate_skip? + raise ArgumentError, "#{connection.class} does not support skipping duplicates" + end + + if update_duplicates? && !connection.supports_insert_on_duplicate_update? + raise ArgumentError, "#{connection.class} does not support upsert" + end + + if unique_by && !connection.supports_insert_conflict_target? + raise ArgumentError, "#{connection.class} does not support :unique_by" + end + end + + def to_sql + connection.build_insert_sql(ActiveRecord::InsertAll::Builder.new(self)) + end + + def readonly_columns + primary_keys + model.readonly_attributes.to_a + end + + def unique_by_columns + Array(unique_by&.columns) + end + + def verify_attributes(attributes) + if keys != attributes.keys.to_set + raise ArgumentError, "All objects being inserted must have the same keys\n" \ + "expected: #{keys}\ngot: #{attributes.keys}" + end + end + + class Builder # :nodoc: + attr_reader :model + + delegate :skip_duplicates?, :update_duplicates?, :keys, to: :insert_all + + def initialize(insert_all) + @insert_all, @model, @connection = insert_all, insert_all.model, insert_all.connection + end + + def into + "INTO #{model.quoted_table_name}(#{columns_list})" + end + + def values_list + types = extract_types_from_columns_on(model.table_name, keys: keys) + + values_list = insert_all.map_key_with_value do |key, value| + connection.with_yaml_fallback(types[key].serialize(value)) + end + + Arel::InsertManager.new.create_values_list(values_list).to_sql + end + + def returning + format_columns(insert_all.returning) if insert_all.returning + end + + def conflict_target + if index = insert_all.unique_by + sql = +"(#{format_columns(index.columns)})" + sql << " WHERE #{index.where}" if index.where + sql + elsif update_duplicates? + "(#{format_columns(insert_all.primary_keys)})" + end + end + + def updatable_columns + quote_columns(insert_all.updatable_columns) + end + + private + + attr_reader :connection, :insert_all + + def columns_list + format_columns(insert_all.keys) + end + + def extract_types_from_columns_on(table_name, keys:) + columns = connection.schema_cache.columns_hash(table_name) + + unknown_column = (keys - columns.keys).first + raise UnknownAttributeError.new(model.new, unknown_column) if unknown_column + + keys.map { |key| [key, connection.lookup_cast_type_from_column(columns[key])] }.to_h + end + + def format_columns(columns) + quote_columns(columns).join(",") + end + + def quote_columns(columns) + columns.map(&connection.method(:quote_column_name)) + end + end + end +end diff --git a/lib/gitlab/database/bulk_insert_support.rb b/lib/gitlab/database/bulk_insert_support.rb index eb4c2aed54ae8d..65a347bce0ae65 100644 --- a/lib/gitlab/database/bulk_insert_support.rb +++ b/lib/gitlab/database/bulk_insert_support.rb @@ -181,7 +181,19 @@ def _gl_bulk_insert values = captures.map { |c| c[:values] } - ids = Gitlab::Database.bulk_insert(table_name, values, return_ids: true) + all_keys = values.inject(Set.new) do |key_set, attributes| + key_set.merge(attributes.keys) + end + + # backfill nulls for missing keys + values.each do |row| + all_keys.each do |key| + row[key] ||= nil + end + end + + result = insert_all!(values, returning: [:id]) + ids = result.map { |row| row['id'] } # inject row IDs back into model instances if ids && ids.any? diff --git a/spec/lib/gitlab/database/bulk_insert_support_spec.rb b/spec/lib/gitlab/database/bulk_insert_support_spec.rb index 65fc1068d5d705..ec67679fe786c9 100644 --- a/spec/lib/gitlab/database/bulk_insert_support_spec.rb +++ b/spec/lib/gitlab/database/bulk_insert_support_spec.rb @@ -259,21 +259,24 @@ class BulkInsertItem < ApplicationRecord context 'with batch size' do it 'performs bulk insert for each batch' do - allow(Gitlab::Database).to receive(:bulk_insert).and_call_original items = Array.new(5) { |n| new_item(name: "item#{n}") } item_values = items.map do |i| { "before": "#{i.name} set from before_save", "name": i.name }.stringify_keys end - BulkInsertItem.save_all!(items, batch_size: 2) - - # should produce 3 INSERTs: [one, two], [three, four], [five] - expect(Gitlab::Database).to have_received(:bulk_insert) - .with('bulk_insert_items', item_values[0..1], return_ids: true) - expect(Gitlab::Database).to have_received(:bulk_insert) - .with('bulk_insert_items', item_values[2..3], return_ids: true) - expect(Gitlab::Database).to have_received(:bulk_insert) - .with('bulk_insert_items', item_values[4..-1], return_ids: true) + expect(ActiveRecord::InsertAll).to receive(:new) + .with(BulkInsertItem, item_values[0..1], on_duplicate: :raise, returning: [:id]) + .and_call_original + expect(ActiveRecord::InsertAll).to receive(:new) + .with(BulkInsertItem, item_values[2..3], on_duplicate: :raise, returning: [:id]) + .and_call_original + expect(ActiveRecord::InsertAll).to receive(:new) + .with(BulkInsertItem, item_values[4..-1], on_duplicate: :raise, returning: [:id]) + .and_call_original + + expect { BulkInsertItem.save_all!(items, batch_size: 2) }.to( + change { BulkInsertItem.count }.from(0).to(5) + ) end end -- GitLab