diff --git a/app/models/application_record.rb b/app/models/application_record.rb index 0979d03f6e6e2fd953bf5bda1eed35163033c15b..426bf098032f767a8d7157614fc2e332fd099d8e 100644 --- a/app/models/application_record.rb +++ b/app/models/application_record.rb @@ -1,6 +1,10 @@ # frozen_string_literal: true +require_dependency Rails.root.join('lib/gitlab/database/bulk_insert_support') + class ApplicationRecord < ActiveRecord::Base + include ::Gitlab::Database::BulkInsertSupport + self.abstract_class = true alias_method :reset, :reload diff --git a/lib/gitlab/database/active_record_insert_all_backport/insert_all.rb b/lib/gitlab/database/active_record_insert_all_backport/insert_all.rb new file mode 100644 index 0000000000000000000000000000000000000000..f908b2dd153a09c09ce7840b4ab4d733430d0eac --- /dev/null +++ b/lib/gitlab/database/active_record_insert_all_backport/insert_all.rb @@ -0,0 +1,220 @@ +# frozen_string_literal: true + +# Adds Rails 6 `InsertAll` feature. +# Can be removed once we're on Rails 6. +module ActiveRecord + module ConnectionAdapters + class PostgreSQLAdapter < AbstractAdapter + def build_insert_sql(insert) # :nodoc: + sql = +"INSERT #{insert.into} #{insert.values_list}" + + if insert.skip_duplicates? + sql << " ON CONFLICT #{insert.conflict_target} DO NOTHING" + elsif insert.update_duplicates? + sql << " ON CONFLICT #{insert.conflict_target} DO UPDATE SET " + sql << insert.updatable_columns.map { |column| "#{column}=excluded.#{column}" }.join(",") + end + + sql << " RETURNING #{insert.returning}" if insert.returning + sql + end + + def supports_insert_returning? + true + end + + # Fixture value is quoted by Arel, however scalar values + # are not quotable. In this case we want to convert + # the column value to YAML. + def with_yaml_fallback(value) # :nodoc: + if value.is_a?(Hash) || value.is_a?(Array) + YAML.dump(value) + else + value + end + end + end + end + + module Persistence + module ClassMethods + def insert_all!(attributes, returning: nil) + InsertAll.new(self, attributes, on_duplicate: :raise, returning: returning).execute + end + end + end + + class InsertAll # :nodoc: + attr_reader :model, :connection, :inserts, :keys + attr_reader :on_duplicate, :returning, :unique_by + + def initialize(model, inserts, on_duplicate:, returning: nil, unique_by: nil) + raise ArgumentError, "Empty list of attributes passed" if inserts.blank? + + @model, @connection, @inserts, @keys = model, model.connection, inserts, inserts.first.keys.map(&:to_s).to_set + @on_duplicate, @returning, @unique_by = on_duplicate, returning, unique_by + + @returning = (connection.supports_insert_returning? ? primary_keys : false) if @returning.nil? + @returning = false if @returning == [] + + @unique_by = find_unique_index_for(unique_by) if unique_by + @on_duplicate = :skip if @on_duplicate == :update && updatable_columns.empty? + + ensure_valid_options_for_connection! + end + + def execute + message = +"#{model} " + message << "Bulk " if inserts.many? + message << (on_duplicate == :update ? "Upsert" : "Insert") + connection.exec_query to_sql, message + end + + def updatable_columns + keys - readonly_columns - unique_by_columns + end + + def primary_keys + Array(model.primary_key) + end + + def skip_duplicates? + on_duplicate == :skip + end + + def update_duplicates? + on_duplicate == :update + end + + def map_key_with_value + inserts.map do |attributes| + attributes = attributes.stringify_keys + verify_attributes(attributes) + + keys.map do |key| + yield key, attributes[key] + end + end + end + + private + + def find_unique_index_for(unique_by) + match = Array(unique_by).map(&:to_s) + + if index = unique_indexes.find { |i| match.include?(i.name) || i.columns == match } + index + else + raise ArgumentError, "No unique index found for #{unique_by}" + end + end + + def unique_indexes + connection.schema_cache.indexes(model.table_name).select(&:unique) + end + + def ensure_valid_options_for_connection! + if returning && !connection.supports_insert_returning? + raise ArgumentError, "#{connection.class} does not support :returning" + end + + if skip_duplicates? && !connection.supports_insert_on_duplicate_skip? + raise ArgumentError, "#{connection.class} does not support skipping duplicates" + end + + if update_duplicates? && !connection.supports_insert_on_duplicate_update? + raise ArgumentError, "#{connection.class} does not support upsert" + end + + if unique_by && !connection.supports_insert_conflict_target? + raise ArgumentError, "#{connection.class} does not support :unique_by" + end + end + + def to_sql + connection.build_insert_sql(ActiveRecord::InsertAll::Builder.new(self)) + end + + def readonly_columns + primary_keys + model.readonly_attributes.to_a + end + + def unique_by_columns + Array(unique_by&.columns) + end + + def verify_attributes(attributes) + if keys != attributes.keys.to_set + raise ArgumentError, "All objects being inserted must have the same keys\n" \ + "expected: #{keys}\ngot: #{attributes.keys}" + end + end + + class Builder # :nodoc: + attr_reader :model + + delegate :skip_duplicates?, :update_duplicates?, :keys, to: :insert_all + + def initialize(insert_all) + @insert_all, @model, @connection = insert_all, insert_all.model, insert_all.connection + end + + def into + "INTO #{model.quoted_table_name}(#{columns_list})" + end + + def values_list + types = extract_types_from_columns_on(model.table_name, keys: keys) + + values_list = insert_all.map_key_with_value do |key, value| + connection.with_yaml_fallback(types[key].serialize(value)) + end + + Arel::InsertManager.new.create_values_list(values_list).to_sql + end + + def returning + format_columns(insert_all.returning) if insert_all.returning + end + + def conflict_target + if index = insert_all.unique_by + sql = +"(#{format_columns(index.columns)})" + sql << " WHERE #{index.where}" if index.where + sql + elsif update_duplicates? + "(#{format_columns(insert_all.primary_keys)})" + end + end + + def updatable_columns + quote_columns(insert_all.updatable_columns) + end + + private + + attr_reader :connection, :insert_all + + def columns_list + format_columns(insert_all.keys) + end + + def extract_types_from_columns_on(table_name, keys:) + columns = connection.schema_cache.columns_hash(table_name) + + unknown_column = (keys - columns.keys).first + raise UnknownAttributeError.new(model.new, unknown_column) if unknown_column + + keys.map { |key| [key, connection.lookup_cast_type_from_column(columns[key])] }.to_h + end + + def format_columns(columns) + quote_columns(columns).join(",") + end + + def quote_columns(columns) + columns.map(&connection.method(:quote_column_name)) + end + end + end +end diff --git a/lib/gitlab/database/bulk_insert_support.rb b/lib/gitlab/database/bulk_insert_support.rb new file mode 100644 index 0000000000000000000000000000000000000000..65a347bce0ae655114ee9782cd131b4f4a64c546 --- /dev/null +++ b/lib/gitlab/database/bulk_insert_support.rb @@ -0,0 +1,223 @@ +# frozen_string_literal: true + +module Gitlab + module Database + module BulkInsertSupport + extend ActiveSupport::Concern + + NestedCallError = Class.new(StandardError) + TargetTypeError = Class.new(StandardError) + + DELAYED_CALLBACKS = [:save, :create].freeze + + class BulkInsertState + attr_reader :captures, :callbacks + + def initialize + @captures = [] + @callbacks = nil + end + + def add_capture(capture) + @captures << capture + end + + def inject_last_values!(values) + @captures.last[:values] = values + end + + def store_callbacks(callbacks) + @callbacks = callbacks.deep_dup + end + + def callbacks_for(name) + @callbacks[name].deep_dup + end + end + + # We currently reach into internal APIs, which requires __send__ + # rubocop: disable GitlabSecurity/PublicSend + class_methods do + def save_all!(*items, batch_size: 100) + raise NestedCallError.new("Cannot nest bulk inserts") if _gl_bulk_inserts_requested? + + # Until we find a better way to delay AR callbacks than rewriting them, + # we need to protect the entire transaction from other threads. Since + # callbacks are class-level instances, they might otherwise interfere + # with each other. + # This essentially means that bulk inserts are entirely sequential even + # in multi-threaded setups. + @_gl_bulk_insert_lock ||= Mutex.new + @_gl_bulk_insert_lock.synchronize do + items.flatten.each_slice(batch_size) do |item_batch| + self.transaction do + _gl_set_bulk_insert_state + _gl_delay_after_callbacks + + item_batch.each do |item| + unless item.is_a?(self) + raise TargetTypeError.new("Wrong instance type %s, expected T <= %s" % [item.class, self]) + end + + item.save! + end + + _gl_bulk_insert + _gl_restore_callbacks + _gl_replay_callbacks + end + end + ensure + # might be called redundantly, but if we terminated abnormally anywhere + # due to an exception, we must restore the class' callback structure + _gl_restore_callbacks + _gl_release_bulk_insert_state + end + end + + def _gl_bulk_inserts_requested? + !!_gl_bulk_insert_state + end + + def _gl_capture_record(record) + _gl_bulk_insert_state.add_capture(record) + end + + # Overrides ActiveRecord::Persistence::ClassMethods._insert_record + def _insert_record(values) + if _gl_bulk_inserts_requested? + _gl_bulk_insert_state.inject_last_values!(values) + else + super + end + end + + private + + def _gl_bulk_insert_state + Thread.current[:_gl_bulk_insert_state]&.fetch(self, nil) + end + + def _gl_set_bulk_insert_state + Thread.current[:_gl_bulk_insert_state] = { + self => BulkInsertState.new + } + end + + def _gl_release_bulk_insert_state + Thread.current[:_gl_bulk_insert_state] = nil + end + + def _gl_delay_after_callbacks + # make backup of original callback chain + _gl_bulk_insert_state.store_callbacks(__callbacks) + + # remove after_* callbacks, so that they won't be invoked + DELAYED_CALLBACKS.each do |cb| + _gl_suppress_callbacks(_gl_get_callback_chain(cb)) { |cb| cb.kind == :after } + end + end + + def _gl_restore_callbacks + return unless _gl_bulk_insert_state + + stored_callbacks = _gl_bulk_insert_state.callbacks + + if stored_callbacks&.any? + DELAYED_CALLBACKS.each do |cb| + _gl_set_callback_chain(stored_callbacks[cb], cb) + end + end + end + + def _gl_replay_callbacks + _gl_bulk_insert_state.captures.each do |capture| + target = capture[:target] + _gl_run_callback(target, :after, :create) + _gl_run_callback(target, :after, :save) + end + end + + # Because `run_callbacks` will run _all_ callbacks for e.g. `save`, + # we need to filter out anything that is not of the given `kind` first. + # + # TODO: this is currently hacky and inefficient, since it involves a lot + # of copying of callback structures. This only needs to happen once per + # replay, not for every item. + def _gl_run_callback(target, kind, name) + # obtain a copy of the stored/original save callbacks + requested_callbacks = _gl_bulk_insert_state.callbacks_for(name) + + # retain only callback hooks of the requested kind + _gl_suppress_callbacks(requested_callbacks) { |cb| cb.kind != kind } + _gl_set_callback_chain(requested_callbacks, name) + + target.run_callbacks name + + # restore callbacks to previous state + _gl_restore_callbacks + end + + def _gl_get_callback_chain(name) + __send__("_#{name}_callbacks") + end + + # `callback_chain`: ActiveSupport::Callbacks::CallbackChain + # `name`: :save, :create, ... + def _gl_set_callback_chain(callback_chain, name) + __send__("_#{name}_callbacks=", callback_chain) + __callbacks[name] = callback_chain + end + + # `callback_chain`: ActiveSupport::Callbacks::CallbackChain + # `kind`: :before, :after, ... + def _gl_suppress_callbacks(callback_chain, &block) + callback_chain.__send__(:chain).reject!(&block) + end + + def _gl_bulk_insert + # obtains all captured model instances & row values + captures = _gl_bulk_insert_state.captures + + values = captures.map { |c| c[:values] } + + all_keys = values.inject(Set.new) do |key_set, attributes| + key_set.merge(attributes.keys) + end + + # backfill nulls for missing keys + values.each do |row| + all_keys.each do |key| + row[key] ||= nil + end + end + + result = insert_all!(values, returning: [:id]) + ids = result.map { |row| row['id'] } + + # inject row IDs back into model instances + if ids && ids.any? + captures.zip(ids).each { |c, id| c[:target].id = id } + end + end + end + + private + + # Overrides ActiveRecord::*._create_record + def _create_record(*) + if self.class._gl_bulk_inserts_requested? + # We need to hold on to the current instance, so that we can + # inject IDs into them later on. + # The row `values` will be injected later in `_insert_record`/ + self.class._gl_capture_record({ + target: self, + values: nil + }) + end + + super + end + end + end +end diff --git a/spec/lib/gitlab/database/bulk_insert_support_spec.rb b/spec/lib/gitlab/database/bulk_insert_support_spec.rb new file mode 100644 index 0000000000000000000000000000000000000000..ec67679fe786c978c1e08bab6caa097b1839b819 --- /dev/null +++ b/spec/lib/gitlab/database/bulk_insert_support_spec.rb @@ -0,0 +1,334 @@ +# frozen_string_literal: true + +require 'spec_helper' + +describe Gitlab::Database::BulkInsertSupport do + class ItemDependency < ApplicationRecord + belongs_to :bulk_insert_item + end + + class BulkInsertItem < ApplicationRecord + attr_reader :after, :callback_invocations + attr_writer :fail_before_save, :fail_after_save, :fail_after_commit + + after_initialize do + @callback_invocations = { + before_save: 0, + before_create: 0, + after_create: 0, + after_save: 0, + after_commit: 0, + sequence: [] + } + end + + has_one :item_dependency + + after_commit do + @callback_invocations[:after_commit] += 1 + @callback_invocations[:sequence] << :after_commit + + raise "'id' must be set in after_commit" unless self.id + end + + before_create do + @callback_invocations[:before_create] += 1 + @callback_invocations[:sequence] << :before_create + end + + before_save do + @callback_invocations[:before_save] += 1 + @callback_invocations[:sequence] << :before_save + + raise "failed in before_save" if @fail_before_save + + # create something within the outer transaction so + # we can test rollbacks + ItemDependency.create!(name: 'before_save') + + self.before = "#{name} set from before_save" + end + + after_save do + @callback_invocations[:after_save] += 1 + @callback_invocations[:sequence] << :after_save + raise "'id' must be set in after_save" unless self.id + + raise "failed in after_save" if @fail_after_save + + @after = "#{name} set from after_save" + end + + after_create do + @callback_invocations[:after_create] += 1 + @callback_invocations[:sequence] << :after_create + raise "'id' must be set in after_create" unless self.id + + raise "failed in after_commit" if @fail_after_commit + end + end + + before(:all) do + ActiveRecord::Schema.define do + create_table :bulk_insert_items, force: true do |t| + t.string :name, null: false + t.string :before, null: true + end + + create_table :item_dependencies, force: true do |t| + t.string :name + t.belongs_to :bulk_insert_item, null: true + end + end + end + + after(:all) do + ActiveRecord::Schema.define do + drop_table :item_dependencies, force: true + drop_table :bulk_insert_items, force: true + end + end + + describe 'save_all!' do + let(:good_item) { new_item } + let(:bad_item) { new_item(name: nil) } + + it 'inserts all items from list' do + items = Array.new(2) { new_item } + + expect do + BulkInsertItem.save_all!(items) + end.to change { BulkInsertItem.count }.from(0).to(2) + + BulkInsertItem.all do |item| + expect(item.id).not_to be_nil + expect(item.name).to eq("item") + end + end + + it 'inserts all items from varargs' do + expect do + BulkInsertItem.save_all!(new_item(name: "one"), new_item(name: "two")) + end.to change { BulkInsertItem.count }.from(0).to(2) + + inserted = BulkInsertItem.all + expect(inserted.size).to eq(2) + expect(inserted.first.id).not_to be_nil + expect(inserted.first.name).to eq('one') + expect(inserted.last.id).not_to be_nil + expect(inserted.last.name).to eq('two') + end + + it 'maintains correct AR callback order' do + items = [new_item(name: "one"), new_item(name: "two")] + + BulkInsertItem.save_all!(items) + + expect(items.map { |i| i.callback_invocations[:sequence] }).to all(eq([ + :before_save, + :before_create, + :after_create, + :after_save, + :after_commit + ])) + end + + context 'before_save action' do + it 'runs once for each item before INSERT' do + item1 = new_item(name: "one") + item2 = new_item(name: "two") + + BulkInsertItem.save_all!(item1, item2) + + expect(item1.callback_invocations[:before_save]).to eq(1) + expect(item2.callback_invocations[:before_save]).to eq(1) + + expect(item1.before).to eq("one set from before_save") + expect(item2.before).to eq("two set from before_save") + + # these changes must survive a reload from DB, or they wouldn't have been persisted + expect(item1.reload.before).to eq("one set from before_save") + expect(item2.reload.before).to eq("two set from before_save") + end + + it 'rolls back changes when failed' do + item1 = new_item(name: "one") + item2 = new_item(name: "two") + item2.fail_before_save = true + + expect do + BulkInsertItem.save_all!(item1, item2) + rescue + end.not_to change { ItemDependency.count } + end + end + + context 'before_create action' do + it 'runs once for each item before INSERT' do + items = [new_item(name: "one"), new_item(name: "two")] + + BulkInsertItem.save_all!(items) + + expect(items.map { |i| i.callback_invocations[:before_create] }).to all(eq(1)) + end + end + + context 'after_save action' do + it 'runs once for each item after INSERT' do + item1 = new_item(name: "one") + item2 = new_item(name: "two") + + BulkInsertItem.save_all!(item1, item2) + + expect(item1.callback_invocations[:after_save]).to eq(1) + expect(item2.callback_invocations[:after_save]).to eq(1) + + expect(item1.after).to eq("one set from after_save") + expect(item2.after).to eq("two set from after_save") + end + + it 'rolls back changes when failed' do + item1 = new_item(name: "one") + item2 = new_item(name: "two") + item2.fail_after_save = true + + expect do + BulkInsertItem.save_all!(item1, item2) + rescue + end.not_to change { BulkInsertItem.count } + expect(ItemDependency.count).to eq(0) + end + end + + context 'after_create action' do + it 'runs once for each item before INSERT' do + items = [new_item(name: "one"), new_item(name: "two")] + + BulkInsertItem.save_all!(items) + + expect(items.map { |i| i.callback_invocations[:after_create] }).to all(eq(1)) + end + + it 'rolls back changes when failed' do + item1 = new_item(name: "one") + item2 = new_item(name: "two") + item2.fail_after_commit = true + + expect do + BulkInsertItem.save_all!(item1, item2) + rescue + end.not_to change { BulkInsertItem.count } + expect(ItemDependency.count).to eq(0) + end + end + + context 'with concurrency' do + let(:num_threads) { 10 } + + it 'works when called from multiple threads for same class' do + threads = Array.new(num_threads) do + Thread.new { new_item.tap { |i| BulkInsertItem.save_all!(i) } } + end + + saved_items = threads.map(&:value) + + expect(BulkInsertItem.count).to eq(num_threads) + expect(saved_items.map { |i| i.callback_invocations[:sequence] }).to all(eq([ + :before_save, + :before_create, + :after_create, + :after_save, + :after_commit + ])) + end + + it 'works when called from multiple threads for different classes' do + threads = [ + Thread.new { BulkInsertItem.save_all!(new_item) }, + Thread.new { ItemDependency.save_all!(new_item_dep) }, + Thread.new { BulkInsertItem.save_all!(new_item) }, + Thread.new { ItemDependency.save_all!(new_item_dep) } + ] + + threads.each(&:join) + + expect(BulkInsertItem.count).to eq(2) + expect(ItemDependency.count).to eq(2 + 2) # also created from before_save + end + end + + context 'with batch size' do + it 'performs bulk insert for each batch' do + items = Array.new(5) { |n| new_item(name: "item#{n}") } + item_values = items.map do |i| + { "before": "#{i.name} set from before_save", "name": i.name }.stringify_keys + end + + expect(ActiveRecord::InsertAll).to receive(:new) + .with(BulkInsertItem, item_values[0..1], on_duplicate: :raise, returning: [:id]) + .and_call_original + expect(ActiveRecord::InsertAll).to receive(:new) + .with(BulkInsertItem, item_values[2..3], on_duplicate: :raise, returning: [:id]) + .and_call_original + expect(ActiveRecord::InsertAll).to receive(:new) + .with(BulkInsertItem, item_values[4..-1], on_duplicate: :raise, returning: [:id]) + .and_call_original + + expect { BulkInsertItem.save_all!(items, batch_size: 2) }.to( + change { BulkInsertItem.count }.from(0).to(5) + ) + end + end + + it 'throws when some or all items are not of the specified type' do + expect { BulkInsertItem.save_all!(good_item, "not a `BulkInsertItem`") }.to( + raise_error(Gitlab::Database::BulkInsertSupport::TargetTypeError) + ) + end + + context 'when calls are nested' do + class WithIllegalNestedCall < BulkInsertItem + after_save -> { WithIllegalNestedCall.save_all!(self) } + end + + class WithLegalNestedCall < BulkInsertItem + after_save -> { ItemDependency.save_all!(ItemDependency.new(name: 'ok')) } + end + + it 'throws when called on the same class' do + expect { WithIllegalNestedCall.save_all!(WithIllegalNestedCall.new(name: 'not ok')) }.to( + raise_error(Gitlab::Database::BulkInsertSupport::NestedCallError) + ) + end + + it 'allows calls for other types' do + expect { WithLegalNestedCall.save_all!(WithLegalNestedCall.new(name: 'ok')) }.to( + change { ItemDependency.count }.from(0).to(2) # count the extra insertion from before_save + ) + end + end + + context 'when failures are present' do + it 'propagates failures' do + expect { BulkInsertItem.save_all!(bad_item) }.to raise_error(ActiveRecord::NotNullViolation) + end + + it 'rolls back all changes' do + expect do + BulkInsertItem.save_all!(good_item, bad_item) + rescue + end.not_to change { BulkInsertItem.count } + end + end + end + + private + + def new_item(name: 'item', dep: nil) + BulkInsertItem.new(name: name, item_dependency: dep) + end + + def new_item_dep(name: 'item_dep') + ItemDependency.new(name: name) + end +end