diff --git a/ee/app/models/virtual_registries/container/cache/entry.rb b/ee/app/models/virtual_registries/container/cache/entry.rb index ddd819dfd839b5b4d90f84de8d1ce21411216350..37a18d702e1a9cb3e7a5729e5614175b424f14bf 100644 --- a/ee/app/models/virtual_registries/container/cache/entry.rb +++ b/ee/app/models/virtual_registries/container/cache/entry.rb @@ -25,6 +25,8 @@ class Entry < ApplicationRecord sha_attribute :file_sha1 sha_attribute :file_md5 + counter_attribute :downloads_count, touch: :downloaded_at + validates :group, top_level_group: true, presence: true validates :relative_path, :object_storage_key, @@ -41,11 +43,43 @@ class Entry < ApplicationRecord validates :object_storage_key, uniqueness: { scope: :relative_path } validates :file, presence: true + mount_file_store_uploader ::VirtualRegistries::Cache::EntryUploader + before_validation :set_object_storage_key, if: -> { object_storage_key.blank? && upstream } attr_readonly :object_storage_key scope :requiring_cleanup, ->(n_days_to_keep) { where(downloaded_at: ...(Time.current - n_days_to_keep.days)) } + # create or update a cached response identified by the upstream, group_id and relative_path + # Given that we have chances that this function is not executed in isolation, we can't use + # safe_find_or_create_by. + # We are using the check existence and rescue alternative. + def self.create_or_update_by!(upstream:, group_id:, relative_path:, updates: {}) + default.find_or_initialize_by( + upstream: upstream, + group_id: group_id, + relative_path: relative_path + ).tap do |record| + record.update!(**updates) + end + rescue ActiveRecord::RecordInvalid => invalid + # in case of a race condition, retry the block + retry if invalid.record&.errors&.of_kind?(:relative_path, :taken) + + # otherwise, bubble up the error + raise + end + + def filename + return unless relative_path + + File.basename(relative_path) + end + + def bump_downloads_count + increment_downloads_count(1) + end + private def set_object_storage_key diff --git a/ee/app/services/virtual_registries/container/cache/entries/create_or_update_service.rb b/ee/app/services/virtual_registries/container/cache/entries/create_or_update_service.rb new file mode 100644 index 0000000000000000000000000000000000000000..fec565352fcf3e31d7a806eb32e588932e7744ed --- /dev/null +++ b/ee/app/services/virtual_registries/container/cache/entries/create_or_update_service.rb @@ -0,0 +1,85 @@ +# frozen_string_literal: true + +module VirtualRegistries + module Container + module Cache + module Entries + class CreateOrUpdateService < ::BaseContainerService + alias_method :upstream, :container + + ERRORS = { + unauthorized: ServiceResponse.error(message: 'Unauthorized', reason: :unauthorized), + path_not_present: ServiceResponse.error(message: 'Parameter path not present', reason: :path_not_present), + file_not_present: ServiceResponse.error(message: 'Parameter file not present', reason: :file_not_present) + }.freeze + + def initialize(upstream:, current_user: nil, params: {}) + super(container: upstream, current_user: current_user, params: params) + end + + def execute + return ERRORS[:path_not_present] unless path.present? + return ERRORS[:file_not_present] unless file.present? + return ERRORS[:unauthorized] unless allowed? + + now = Time.zone.now + updates = { + upstream_etag: etag, + upstream_checked_at: now, + file: file, + size: file.size, + file_sha1: file.sha1, + file_md5: file.md5, + content_type: content_type + }.compact_blank + + ce = ::VirtualRegistries::Container::Cache::Entry.create_or_update_by!( + group_id: upstream.group_id, + upstream: upstream, + relative_path: relative_path, + updates: updates + ) + + ce.bump_downloads_count + + ServiceResponse.success(payload: { cache_entry: ce }) + rescue StandardError => e + Gitlab::ErrorTracking.track_exception( + e, + upstream_id: upstream.id, + group_id: upstream.group_id, + class: self.class.name + ) + ServiceResponse.error(message: e.message, reason: :persistence_error) + end + + private + + def allowed? + can?(current_user, :read_virtual_registry, upstream) + end + + def file + params[:file] + end + + def path + params[:path] + end + + def relative_path + path.start_with?('/') ? path : "/#{path}" + end + + def etag + params[:etag] + end + + def content_type + params[:content_type] + end + end + end + end + end +end diff --git a/ee/spec/models/virtual_registries/container/cache/entry_spec.rb b/ee/spec/models/virtual_registries/container/cache/entry_spec.rb index 9469e4bbccbdc49054e2fbcca8c466d37cad60f9..b20a18627883932b4377213ecaa5fb8341ebf103 100644 --- a/ee/spec/models/virtual_registries/container/cache/entry_spec.rb +++ b/ee/spec/models/virtual_registries/container/cache/entry_spec.rb @@ -149,4 +149,95 @@ let(:find_model) { described_class.last } end end + + describe '.create_or_update_by!' do + let_it_be(:upstream) { create(:virtual_registries_container_upstream) } + let_it_be(:group) { create(:group) } + + let(:relative_path) { '/test/path' } + let(:updates) { { file_sha1: 'da39a3ee5e6b4b0d3255bfef95601890afd80709' } } + + let(:size) { 10.bytes } + + subject(:create_or_update) do + Tempfile.create('test.txt') do |file| + file.write('test') + described_class.create_or_update_by!( + upstream: upstream, + group_id: upstream.group_id, + relative_path: '/test', + updates: { file: file, size: size, file_sha1: '4e1243bd22c66e76c2ba9eddc1f91394e57f9f95' } + ) + end + end + + context 'with parallel execution' do + it 'creates or update the existing record' do + expect { with_threads { create_or_update } }.to change { described_class.count }.by(1) + end + end + + context 'with invalid updates' do + let(:size) { nil } + + it 'bubbles up the error' do + expect { create_or_update }.to not_change { described_class.count } + .and raise_error(ActiveRecord::RecordInvalid) + end + end + end + + describe '#filename' do + let(:cache_entry) { build(:virtual_registries_container_cache_entry) } + + subject { cache_entry.filename } + + it { is_expected.to eq(File.basename(cache_entry.relative_path)) } + + context 'when relative_path is nil' do + before do + cache_entry.relative_path = nil + end + + it { is_expected.to be_nil } + end + end + + describe '#bump_downloads_count' do + let_it_be(:cache_entry) { create(:virtual_registries_container_cache_entry) } + + subject(:bump) { cache_entry.bump_downloads_count } + + it 'enqueues the update', :sidekiq_inline do + expect(FlushCounterIncrementsWorker) + .to receive(:perform_in) + .with(Gitlab::Counters::BufferedCounter::WORKER_DELAY, described_class.name, cache_entry.id, 'downloads_count') + .and_call_original + + expect { bump }.to change { cache_entry.reload.downloads_count }.by(1) + .and change { cache_entry.downloaded_at } + end + end + + def with_threads(count: 5, &block) + return unless block + + # create a race condition - structure from https://blog.arkency.com/2015/09/testing-race-conditions/ + wait_for_it = true + + threads = Array.new(count) do + Thread.new do + # each thread must checkout its own connection + ApplicationRecord.connection_pool.with_connection do + # A loop to make threads busy until we `join` them + true while wait_for_it + + yield + end + end + end + + wait_for_it = false + threads.each(&:join) + end end diff --git a/ee/spec/services/virtual_registries/container/cache/entries/create_or_update_service_spec.rb b/ee/spec/services/virtual_registries/container/cache/entries/create_or_update_service_spec.rb new file mode 100644 index 0000000000000000000000000000000000000000..9d3e30997b1c408f10ac0f7dccb4604367a56769 --- /dev/null +++ b/ee/spec/services/virtual_registries/container/cache/entries/create_or_update_service_spec.rb @@ -0,0 +1,146 @@ +# frozen_string_literal: true + +require 'spec_helper' + +RSpec.describe VirtualRegistries::Container::Cache::Entries::CreateOrUpdateService, :aggregate_failures, feature_category: :virtual_registry do + let_it_be(:registry) { create(:virtual_registries_container_registry) } + let_it_be(:project) { create(:project, namespace: registry.group) } + let_it_be(:user) { create(:user, owner_of: project) } + let_it_be(:path) { "#{path}-#{SecureRandom.hex(4)}" } + let_it_be(:upstream) { create(:virtual_registries_container_upstream, registries: [registry]) } + + let(:etag) { 'test' } + let(:content_type) { 'text/xml' } + let(:params) { { path: path, file: file, etag: etag, content_type: content_type } } + let(:file) do + UploadedFile.new( + Tempfile.new(etag).path, + sha1: '4e1243bd22c66e76c2ba9eddc1f91394e57f9f83', + md5: 'd8e8fca2dc0f896fd7cb4cb0031ba249' + ) + end + + let!(:service) do + described_class.new(upstream: upstream, current_user: user, params: params) + end + + describe '#execute' do + subject(:execute) { service.execute } + + shared_examples 'returning a service response success response' do + shared_examples 'creating a new cache entry' do + it 'returns a success service response', :freeze_time do + expect_next_instance_of(::VirtualRegistries::Container::Cache::Entry) do |expected_cache_entry| + expect(expected_cache_entry).to receive(:bump_downloads_count) + end + + expect { execute }.to change { upstream.cache_entries.count }.by(1) + expect(execute).to be_success + + last_cache_entry = upstream.cache_entries.last + expect(execute.payload).to eq(cache_entry: last_cache_entry) + + expect(last_cache_entry).to have_attributes( + group_id: registry.group.id, + upstream_checked_at: Time.zone.now, + relative_path: "/#{path}", + upstream_etag: etag, + content_type: content_type, + file_sha1: '4e1243bd22c66e76c2ba9eddc1f91394e57f9f83', + file_md5: 'd8e8fca2dc0f896fd7cb4cb0031ba249' + ) + end + end + + shared_examples 'updating an existing cache entry' do + let_it_be(:path) { "#{path}-existing" } + let_it_be(:cache_entry) do + create( + :virtual_registries_container_cache_entry, + group: upstream.group, + upstream: upstream, + relative_path: "/#{path}" + ) + end + + it 'updates it', :freeze_time, :sidekiq_inline do + last_cache_entry = upstream.cache_entries.last + + expect { execute }.to not_change { upstream.cache_entries.count } + .and change { last_cache_entry.reset.downloads_count }.by(1) + .and change { last_cache_entry.downloaded_at }.to(Time.current) + + is_expected.to be_success.and have_attributes(payload: { cache_entry: last_cache_entry }) + + expect(last_cache_entry).to have_attributes( + upstream_checked_at: Time.zone.now, + upstream_etag: etag + ) + end + end + + it_behaves_like 'creating a new cache entry' + it_behaves_like 'updating an existing cache entry' + + context 'with a nil content_type' do + let(:params) { super().merge(content_type: nil) } + + it 'creates a cache entry with a default content_type' do + expect { execute }.to change { upstream.cache_entries.count }.by(1) + expect(execute).to be_success + + expect(upstream.cache_entries.last).to have_attributes(content_type: 'application/octet-stream') + end + end + + context 'with an error' do + it 'returns an error response and log the error' do + expect(::VirtualRegistries::Container::Cache::Entry) + .to receive(:create_or_update_by!).and_raise(ActiveRecord::RecordInvalid) + expect(::Gitlab::ErrorTracking).to receive(:track_exception) + .with( + instance_of(ActiveRecord::RecordInvalid), + upstream_id: upstream.id, + group_id: upstream.group_id, + class: described_class.name + ) + expect { execute }.not_to change { upstream.cache_entries.count } + end + end + end + + context 'with a User' do + it_behaves_like 'returning a service response success response' + end + + context 'with a DeployToken' do + let_it_be(:user) { create(:deploy_token, :group, groups: [registry.group], read_virtual_registry: true) } + + it_behaves_like 'returning a service response success response' + end + + context 'with no path' do + let(:path) { nil } + + it { is_expected.to eq(described_class::ERRORS[:path_not_present]) } + end + + context 'with no file' do + let(:file) { nil } + + it { is_expected.to eq(described_class::ERRORS[:file_not_present]) } + end + + context 'with no upstream' do + let_it_be(:upstream) { nil } + + it { is_expected.to eq(described_class::ERRORS[:unauthorized]) } + end + + context 'with no user' do + let(:user) { nil } + + it { is_expected.to eq(described_class::ERRORS[:unauthorized]) } + end + end +end