diff --git a/app/models/upload.rb b/app/models/upload.rb index c1a3df824573ed23d02b9f5b0cd3a05fe79ed457..ac7ebb31abc4c8a2caa8a2b537b25c80db2df6ee 100644 --- a/app/models/upload.rb +++ b/app/models/upload.rb @@ -2,6 +2,7 @@ class Upload < ApplicationRecord include Checksummable + # Upper limit for foreground checksum processing CHECKSUM_THRESHOLD = 100.megabytes @@ -51,9 +52,9 @@ def begin_fast_destroy ## # FastDestroyAll concerns - def finalize_fast_destroy(keys) - keys.each do |store_class, paths| - store_class.new.delete_keys_async(paths) + def finalize_fast_destroy(items_to_remove) + items_to_remove.each do |store_class, keys| + store_class.new.delete_keys_async(keys) end end end @@ -65,6 +66,10 @@ def absolute_path uploader_class.absolute_path(self) end + def relative_path + uploader_class.relative_path(self) + end + def calculate_checksum! self.checksum = nil return unless needs_checksum? diff --git a/app/models/uploads/local.rb b/app/models/uploads/local.rb index bd295a6683868656c08765d44606bc34cf2d52b7..9df69998991f27dc5ec5401ab90ee250fac811f8 100644 --- a/app/models/uploads/local.rb +++ b/app/models/uploads/local.rb @@ -55,3 +55,5 @@ def storage_dir end end end + +Uploads::Local.prepend_mod diff --git a/app/uploaders/file_uploader.rb b/app/uploaders/file_uploader.rb index 20aab58243a385c11dc8fdb4e1ad038f71909605..bd959b14648d9684104ab973cd20c5790507a55e 100644 --- a/app/uploaders/file_uploader.rb +++ b/app/uploaders/file_uploader.rb @@ -32,7 +32,14 @@ def self.root def self.absolute_path(upload) File.join( - absolute_base_dir(upload.model), + root, + relative_path(upload) + ) + end + + def self.relative_path(upload) + File.join( + base_dir(upload.model), upload.path # already contain the dynamic_segment, see #upload_path ) end diff --git a/ee/app/models/concerns/geo/blob_replicator_strategy.rb b/ee/app/models/concerns/geo/blob_replicator_strategy.rb index 9de1345f2041cf5928ec634d96a7073df7f7a9e7..e9a88a4c4c0107060063a8884f77ca666a431b81 100644 --- a/ee/app/models/concerns/geo/blob_replicator_strategy.rb +++ b/ee/app/models/concerns/geo/blob_replicator_strategy.rb @@ -98,16 +98,20 @@ def file_exists? carrierwave_uploader.file.exists? end + def deleted_params + { + model_record_id: model_record.id, + uploader_class: carrierwave_uploader.class.to_s, + blob_path: carrierwave_uploader.relative_path + } + end + private def download ::Geo::BlobDownloadService.new(replicator: self).execute end - def deleted_params - { model_record_id: model_record.id, uploader_class: carrierwave_uploader.class.to_s, blob_path: carrierwave_uploader.relative_path } - end - # Return whether it's capable of generating a checksum of itself # # @return [Boolean] whether it can generate a checksum diff --git a/ee/app/models/ee/uploads/local.rb b/ee/app/models/ee/uploads/local.rb new file mode 100644 index 0000000000000000000000000000000000000000..cc4d44f0b114b4da3a95bdf7876b788df393a06b --- /dev/null +++ b/ee/app/models/ee/uploads/local.rb @@ -0,0 +1,29 @@ +# frozen_string_literal: true + +module EE + module Uploads + module Local + extend ::Gitlab::Utils::Override + + override :keys + def keys(relation) + return super unless ::Geo::EventStore.can_create_event? + + relation.includes(:model).find_each.map do |record| + record.replicator.deleted_params.merge(absolute_path: record.absolute_path) + end + end + + override :delete_keys_async + def delete_keys_async(keys_to_delete) + return super unless ::Geo::EventStore.can_create_event? + + keys_to_delete.each_slice(::Uploads::Base::BATCH_SIZE) do |batch| + ::DeleteStoredFilesWorker.perform_async(self.class, batch.pluck(:absolute_path)) + + ::Geo::UploadReplicator.bulk_create_delete_events_async(batch) + end + end + end + end +end diff --git a/ee/app/models/geo/event_log.rb b/ee/app/models/geo/event_log.rb index 1d0b779766cd1f5fc42c37bf196b49ac63582571..9c64ca6e0ff2cf24bfe51015592de574c17bfdb9 100644 --- a/ee/app/models/geo/event_log.rb +++ b/ee/app/models/geo/event_log.rb @@ -71,6 +71,7 @@ class EventLog < ApplicationRecord class_name: 'Geo::Event', foreign_key: :geo_event_id, inverse_of: :geo_event_log + def self.latest_event order(id: :desc).first end diff --git a/ee/app/replicators/geo/upload_replicator.rb b/ee/app/replicators/geo/upload_replicator.rb index 414cd9d7f107d34e94c68add352536555ae73f99..a03fe14437f23457396e47c468a997352e93ad06 100644 --- a/ee/app/replicators/geo/upload_replicator.rb +++ b/ee/app/replicators/geo/upload_replicator.rb @@ -9,6 +9,31 @@ def self.model ::Upload end + def self.bulk_create_delete_events_async(deleted_uploads) + return if deleted_uploads.empty? + + deleted_upload_details = [] + + events = deleted_uploads.map do |upload| + deleted_upload_details << [upload[:model_record_id], upload[:blob_path]] + + { + replicable_name: 'upload', + event_name: 'deleted', + payload: { + model_record_id: upload[:model_record_id], + blob_path: upload[:blob_path], + uploader_class: upload[:uploader_class] + }, + created_at: Time.current + } + end + + log_info('Delete bulk of uploads: ', uploads: deleted_upload_details) + + ::Geo::BatchEventCreateWorker.perform_async(events) + end + def carrierwave_uploader model_record.retrieve_uploader end diff --git a/ee/app/workers/all_queues.yml b/ee/app/workers/all_queues.yml index 9c37ce0b8ab78496879dc4577898fbbae2470ab7..e3852e1cec9d7c0e4466f20d25711f7009329352 100644 --- a/ee/app/workers/all_queues.yml +++ b/ee/app/workers/all_queues.yml @@ -462,6 +462,15 @@ :weight: 2 :idempotent: :tags: [] +- :name: geo:geo_batch_event_create + :worker_name: Geo::BatchEventCreateWorker + :feature_category: :geo_replication + :has_external_dependencies: + :urgency: :low + :resource_boundary: :unknown + :weight: 1 + :idempotent: true + :tags: [] - :name: geo:geo_batch_project_registry :worker_name: Geo::Batch::ProjectRegistryWorker :feature_category: :geo_replication diff --git a/ee/app/workers/geo/batch_event_create_worker.rb b/ee/app/workers/geo/batch_event_create_worker.rb new file mode 100644 index 0000000000000000000000000000000000000000..ad71b5d060157743a637df0383b8094cd90a48eb --- /dev/null +++ b/ee/app/workers/geo/batch_event_create_worker.rb @@ -0,0 +1,20 @@ +# frozen_string_literal: true + +module Geo + class BatchEventCreateWorker + include ApplicationWorker + + data_consistency :always + + include GeoQueue + include ::Gitlab::Geo::LogHelpers + + idempotent! + + def perform(events) + log_info('Executing Geo::BatchEventCreateWorker', events_count: events.size) + + ::Gitlab::Geo::Replicator.bulk_create_events(events) + end + end +end diff --git a/ee/lib/gitlab/geo/replicator.rb b/ee/lib/gitlab/geo/replicator.rb index 958dd20a49239dc0dc1da4e8982e04ccc1240d5e..cf49229c5807a478cb97cf463fc9ab48285e5d70 100644 --- a/ee/lib/gitlab/geo/replicator.rb +++ b/ee/lib/gitlab/geo/replicator.rb @@ -193,6 +193,20 @@ def self.verification_enabled? false end + def self.bulk_create_events(events) + ::Geo::EventLog.transaction do + results = ::Geo::Event.insert_all!(events) + + break if results.rows.empty? + + ids = results.map { |result| { geo_event_id: result['id'], created_at: Time.current } } + ::Geo::EventLog.insert_all!(ids) + end + + rescue ActiveRecord::RecordInvalid, NoMethodError => e + log_error('Geo::EventLog could not be created in bulk', e) + end + # @param [ActiveRecord::Base] model_record # @param [Integer] model_record_id def initialize(model_record: nil, model_record_id: nil) @@ -222,7 +236,6 @@ def publish(event_name, **event_data) raise ArgumentError, "Unsupported event: '#{event_name}'" unless self.class.event_supported?(event_name) create_event_with( - class_name: ::Geo::Event, replicable_name: self.class.replicable_name, event_name: event_name, payload: event_data @@ -310,16 +323,15 @@ def event_params # Store an event on the database # # @example Create an event - # create_event_with(class_name: Geo::CacheInvalidationEvent, key: key) + # create_event_with(key: key) # - # @param [Class] class_name a ActiveRecord class that's used to store an event for Geo # @param [Hash] **params context information that will be stored in the event table # @return [ApplicationRecord] event instance that was just persisted - def create_event_with(class_name:, **params) + def create_event_with(**params) return unless Gitlab::Geo.primary? return unless Gitlab::Geo.secondary_nodes.any? - event = class_name.create!(**params) + event = ::Geo::Event.create!(**params) # Only works with the new geo_events at the moment because we need to # know which foreign key to use @@ -327,7 +339,7 @@ def create_event_with(class_name:, **params) event rescue ActiveRecord::RecordInvalid, NoMethodError => e - log_error("#{class_name} could not be created", e, params) + log_error("::Geo::Event could not be created", e, params) end def current_node diff --git a/ee/spec/lib/gitlab/geo/replicator_spec.rb b/ee/spec/lib/gitlab/geo/replicator_spec.rb index 3e8770dac67ba90613e68dc4c00c5729f481b94e..5b9c1448bcc474be99d76869823528ee31999c99 100644 --- a/ee/spec/lib/gitlab/geo/replicator_spec.rb +++ b/ee/spec/lib/gitlab/geo/replicator_spec.rb @@ -133,6 +133,27 @@ end end + describe '.bulk_create_events' do + let(:event) do + { + replicable_name: 'upload', + event_name: 'created', + payload: { + data: "some payload" + }, + created_at: Time.current + } + end + + let(:events) { [event] } + + it 'creates events' do + expect { Gitlab::Geo::Replicator.bulk_create_events(events) }.to change { ::Geo::EventLog.count }.from(0).to(1) + + expect(::Geo::EventLog.last.event).to be_present + end + end + describe '#initialize' do subject(:replicator) { Geo::DummyReplicator.new(**args) } diff --git a/ee/spec/models/upload_spec.rb b/ee/spec/models/upload_spec.rb index 22c26607472a14e2bbe72ffcfa0aaa33467ba7d1..0b331dec601ad23795061d67fe148bf15e28e0bb 100644 --- a/ee/spec/models/upload_spec.rb +++ b/ee/spec/models/upload_spec.rb @@ -87,7 +87,7 @@ end describe '#destroy' do - subject { create(:upload, checksum: '8710d2c16809c79fee211a9693b64038a8aae99561bc86ce98a9b46b45677fe4') } + subject { create(:upload, :namespace_upload, checksum: '8710d2c16809c79fee211a9693b64038a8aae99561bc86ce98a9b46b45677fe4') } context 'when running in a Geo primary node' do let_it_be(:primary) { create(:geo_node, :primary) } @@ -98,6 +98,18 @@ expect { subject.destroy }.to change(Geo::UploadDeletedEvent, :count).by(1) end + + it 'logs an event to the Geo event log when bulk removal is used', :sidekiq_inline do + stub_current_geo_node(primary) + + expect { subject.model.destroy }.to change(Geo::Event.where(replicable_name: :upload, event_name: :deleted), :count).by(1) + + payload = Geo::Event.where(replicable_name: :upload, event_name: :deleted).last.payload + + expect(payload['model_record_id']).to eq(subject.id) + expect(payload['blob_path']).to eq(subject.relative_path) + expect(payload['uploader_class']).to eq('NamespaceFileUploader') + end end end end diff --git a/ee/spec/models/uploads/local_spec.rb b/ee/spec/models/uploads/local_spec.rb new file mode 100644 index 0000000000000000000000000000000000000000..50117cf2aeae688f6d745ad1afc1a12819dea061 --- /dev/null +++ b/ee/spec/models/uploads/local_spec.rb @@ -0,0 +1,57 @@ +# frozen_string_literal: true + +require 'spec_helper' + +RSpec.describe Uploads::Local, :geo do + include ::EE::GeoHelpers + + let(:data_store) { described_class.new } + + before do + stub_uploads_object_storage(FileUploader) + end + + context 'on a primary when secondary nodes exist' do + let(:project) { create(:project) } + let(:relation) { project.uploads } + + before do + allow(::Geo::EventStore).to receive(:can_create_event?).and_return(true) + end + + describe '#keys' do + let(:upload) { create(:upload, uploader: FileUploader, model: project) } + let!(:uploads) { [upload] } + + it 'returns keys' do + keys = data_store.keys(relation) + + expected_hash = { + absolute_path: upload.absolute_path, + blob_path: upload.retrieve_uploader.relative_path, + model_record_id: upload.id, + uploader_class: "FileUploader" + } + + expect(keys.size).to eq 1 + expect(keys.first).to include(expected_hash) + end + end + + describe '#delete_keys_async' do + it 'performs calls to DeleteStoredFilesWorker and Geo::UploadReplicator.bulk_create_delete_events_async' do + keys_to_delete = [{ + absolute_path: 'absolute_path', + blob_path: 'relative_path', + model_record_id: 1, + uploader_class: "FileUploader" + }] + + expect(::DeleteStoredFilesWorker).to receive(:perform_async).with(Uploads::Local, ['absolute_path']) + expect(::Geo::UploadReplicator).to receive(:bulk_create_delete_events_async).with(keys_to_delete) + + data_store.delete_keys_async(keys_to_delete) + end + end + end +end diff --git a/ee/spec/replicators/geo/upload_replicator_spec.rb b/ee/spec/replicators/geo/upload_replicator_spec.rb index aaf41f8ad97cc4a9f3a3d1cf3143cd4f00885781..43c8cf57dd57911451e6812c05fe5e99fe045fa6 100644 --- a/ee/spec/replicators/geo/upload_replicator_spec.rb +++ b/ee/spec/replicators/geo/upload_replicator_spec.rb @@ -6,4 +6,30 @@ let(:model_record) { create(:upload, :with_file) } include_examples 'a blob replicator' + + describe '.bulk_create_delete_events_async' do + let(:deleted_upload) do + { + model_record_id: 1, + blob_path: 'path', + uploader_class: 'UploaderClass' + } + end + + let(:deleted_uploads) { [deleted_upload] } + + it 'calls Geo::BatchEventCreateWorker and passes events array', :sidekiq_inline do + expect { described_class.bulk_create_delete_events_async(deleted_uploads) }.to change { ::Geo::Event.count }.from(0).to(1) + + created_event = ::Geo::Event.last + expect(created_event.replicable_name).to eq 'upload' + expect(created_event.event_name).to eq 'deleted' + expect(created_event.created_at).to be_present + expect(created_event.payload).to eq(deleted_upload.stringify_keys) + end + + it 'returns nil when empty array is passed' do + expect(described_class.bulk_create_delete_events_async([])).to be_nil + end + end end diff --git a/ee/spec/support/helpers/ee/geo_helpers.rb b/ee/spec/support/helpers/ee/geo_helpers.rb index b567cc240eba766b1c9975e3366ebec74d1a780a..81beee9658a064482297d6261e8fb68de02ce85f 100644 --- a/ee/spec/support/helpers/ee/geo_helpers.rb +++ b/ee/spec/support/helpers/ee/geo_helpers.rb @@ -51,7 +51,7 @@ def with_no_geo_database_configured(&block) def stub_dummy_replicator_class stub_const('Geo::DummyReplicator', Class.new(::Gitlab::Geo::Replicator)) - Geo::DummyReplicator.class_eval do + ::Geo::DummyReplicator.class_eval do event :test event :another_test diff --git a/ee/spec/workers/geo/batch_event_create_worker_spec.rb b/ee/spec/workers/geo/batch_event_create_worker_spec.rb new file mode 100644 index 0000000000000000000000000000000000000000..9dfd193051b824586c8ca4e9b4c304e88558b884 --- /dev/null +++ b/ee/spec/workers/geo/batch_event_create_worker_spec.rb @@ -0,0 +1,15 @@ +# frozen_string_literal: true + +require "spec_helper" + +RSpec.describe Geo::BatchEventCreateWorker, :geo do + describe "#perform" do + it "calls Gitlab::Geo::Replicator.bulk_create_events" do + events = [] + + expect(::Gitlab::Geo::Replicator).to receive(:bulk_create_events).with(events) + + described_class.new.perform(events) + end + end +end diff --git a/spec/models/upload_spec.rb b/spec/models/upload_spec.rb index 0ac684cd04c7221b484b066120aad6c1c6c791ca..7e2c97c8425334721f2ce54b72f0aace7ede2195 100644 --- a/spec/models/upload_spec.rb +++ b/spec/models/upload_spec.rb @@ -82,6 +82,18 @@ end end + describe '#relative_path' do + it "delegates to the uploader's relative_path method" do + uploader = spy('FakeUploader') + upload = described_class.new(path: '/tmp/secret/file.jpg', store: ObjectStorage::Store::LOCAL) + expect(upload).to receive(:uploader_class).and_return(uploader) + + upload.relative_path + + expect(uploader).to have_received(:relative_path).with(upload) + end + end + describe '#calculate_checksum!' do let(:upload) do described_class.new(path: __FILE__,