From 905a38235fe7969196a0df765d18831acf9f9852 Mon Sep 17 00:00:00 2001 From: David Fernandez Date: Thu, 16 Jun 2022 17:38:04 +0200 Subject: [PATCH] Add the packages execute cleanup policy service Update the related services to support this change Changelog: added --- app/models/packages/cleanup/policy.rb | 4 + .../cleanup/execute_policy_service.rb | 98 +++++++++++ ...k_package_files_for_destruction_service.rb | 35 +++- ..._add_index_on_installable_package_files.rb | 20 +++ ...packages_index_on_project_id_and_status.rb | 22 +++ db/schema_migrations/20220617142124 | 1 + db/schema_migrations/20220617143228 | 1 + db/structure.sql | 4 +- spec/models/packages/cleanup/policy_spec.rb | 12 ++ .../cleanup/execute_policy_service_spec.rb | 163 ++++++++++++++++++ ...kage_files_for_destruction_service_spec.rb | 47 ++++- 11 files changed, 396 insertions(+), 11 deletions(-) create mode 100644 app/services/packages/cleanup/execute_policy_service.rb create mode 100644 db/post_migrate/20220617142124_add_index_on_installable_package_files.rb create mode 100644 db/post_migrate/20220617143228_replace_packages_index_on_project_id_and_status.rb create mode 100644 db/schema_migrations/20220617142124 create mode 100644 db/schema_migrations/20220617143228 create mode 100644 spec/services/packages/cleanup/execute_policy_service_spec.rb diff --git a/app/models/packages/cleanup/policy.rb b/app/models/packages/cleanup/policy.rb index d7df90a4ce00c1..c0585a52e6ebea 100644 --- a/app/models/packages/cleanup/policy.rb +++ b/app/models/packages/cleanup/policy.rb @@ -27,6 +27,10 @@ def set_next_run_at # fixed cadence of 12 hours self.next_run_at = Time.zone.now + 12.hours end + + def keep_n_duplicated_package_files_disabled? + keep_n_duplicated_package_files == 'all' + end end end end diff --git a/app/services/packages/cleanup/execute_policy_service.rb b/app/services/packages/cleanup/execute_policy_service.rb new file mode 100644 index 00000000000000..b432f6d0acb33f --- /dev/null +++ b/app/services/packages/cleanup/execute_policy_service.rb @@ -0,0 +1,98 @@ +# frozen_string_literal: true + +module Packages + module Cleanup + class ExecutePolicyService + include Gitlab::Utils::StrongMemoize + + MAX_EXECUTION_TIME = 250.seconds + + DUPLICATED_FILES_BATCH_SIZE = 10_000 + MARK_PACKAGE_FILES_FOR_DESTRUCTION_SERVICE_BATCH_SIZE = 200 + + def initialize(policy) + @policy = policy + @counts = { + marked_package_files_total_count: 0, + unique_package_id_and_file_name_total_count: 0 + } + end + + def execute + cleanup_duplicated_files + end + + private + + def cleanup_duplicated_files + return if @policy.keep_n_duplicated_package_files_disabled? + + result = installable_package_files.each_batch(of: DUPLICATED_FILES_BATCH_SIZE) do |package_files| + break :timeout if cleanup_duplicated_files_on(package_files) == :timeout + end + + response_success(timeout: result == :timeout) + end + + def cleanup_duplicated_files_on(package_files) + unique_package_id_and_file_name_from(package_files).each do |package_id, file_name| + result = remove_duplicated_files_for(package_id: package_id, file_name: file_name) + @counts[:marked_package_files_total_count] += result.payload[:marked_package_files_count] + @counts[:unique_package_id_and_file_name_total_count] += 1 + + break :timeout unless result.success? + end + end + + def unique_package_id_and_file_name_from(package_files) + # This is a highly custom query for this service, that's why it's not in the model. + # rubocop: disable CodeReuse/ActiveRecord + package_files.group(:package_id, :file_name) + .having("COUNT(*) > #{@policy.keep_n_duplicated_package_files}") + .pluck(:package_id, :file_name) + # rubocop: enable CodeReuse/ActiveRecord + end + + def remove_duplicated_files_for(package_id:, file_name:) + base = ::Packages::PackageFile.for_package_ids(package_id) + .installable + .with_file_name(file_name) + ids_to_keep = base.recent + .limit(@policy.keep_n_duplicated_package_files) + .pluck_primary_key + + duplicated_package_files = base.id_not_in(ids_to_keep) + ::Packages::MarkPackageFilesForDestructionService.new(duplicated_package_files) + .execute(batch_deadline: batch_deadline, batch_size: MARK_PACKAGE_FILES_FOR_DESTRUCTION_SERVICE_BATCH_SIZE) + end + + def project + @policy.project + end + + def installable_package_files + ::Packages::PackageFile.installable + .for_package_ids( + ::Packages::Package.installable + .for_projects(project.id) + ) + end + + def batch_deadline + strong_memoize(:batch_deadline) do + MAX_EXECUTION_TIME.from_now + end + end + + def response_success(timeout:) + ServiceResponse.success( + message: "Packages cleanup policy executed for project #{project.id}", + payload: { + timeout: timeout, + counts: @counts + } + ) + end + end + end +end diff --git a/app/services/packages/mark_package_files_for_destruction_service.rb b/app/services/packages/mark_package_files_for_destruction_service.rb index 3672b44b40986b..e7fdd88843a84e 100644 --- a/app/services/packages/mark_package_files_for_destruction_service.rb +++ b/app/services/packages/mark_package_files_for_destruction_service.rb @@ -9,18 +9,41 @@ def initialize(package_files) @package_files = package_files end - def execute - @package_files.each_batch(of: BATCH_SIZE) do |batched_package_files| - batched_package_files.update_all(status: :pending_destruction) + def execute(batch_deadline: nil, batch_size: BATCH_SIZE) + timeout = false + updates_count = 0 + min_batch_size = [batch_size, BATCH_SIZE].min + + @package_files.each_batch(of: min_batch_size) do |batched_package_files| + if batch_deadline && Time.zone.now > batch_deadline + timeout = true + break + end + + updates_count += batched_package_files.update_all(status: :pending_destruction) end - service_response_success('Package files are now pending destruction') + payload = { marked_package_files_count: updates_count } + + return response_error(payload) if timeout + + response_success(payload) end private - def service_response_success(message) - ServiceResponse.success(message: message) + def response_success(payload) + ServiceResponse.success( + message: 'Package files are now pending destruction', + payload: payload + ) + end + + def response_error(payload) + ServiceResponse.error( + message: 'Timeout while marking package files as pending destruction', + payload: payload + ) end end end diff --git a/db/post_migrate/20220617142124_add_index_on_installable_package_files.rb b/db/post_migrate/20220617142124_add_index_on_installable_package_files.rb new file mode 100644 index 00000000000000..e74c6c0935eb18 --- /dev/null +++ b/db/post_migrate/20220617142124_add_index_on_installable_package_files.rb @@ -0,0 +1,20 @@ +# frozen_string_literal: true + +class AddIndexOnInstallablePackageFiles < Gitlab::Database::Migration[2.0] + disable_ddl_transaction! + + INDEX_NAME = 'idx_pkgs_installable_package_files_on_package_id_id_file_name' + # See https://gitlab.com/gitlab-org/gitlab/-/blob/e3ed2c1f65df2e137fc714485d7d42264a137968/app/models/packages/package_file.rb#L16 + DEFAULT_STATUS = 0 + + def up + add_concurrent_index :packages_package_files, + [:package_id, :id, :file_name], + where: "(status = #{DEFAULT_STATUS})", + name: INDEX_NAME + end + + def down + remove_concurrent_index_by_name :packages_package_files, INDEX_NAME + end +end diff --git a/db/post_migrate/20220617143228_replace_packages_index_on_project_id_and_status.rb b/db/post_migrate/20220617143228_replace_packages_index_on_project_id_and_status.rb new file mode 100644 index 00000000000000..d1e70f04468c0c --- /dev/null +++ b/db/post_migrate/20220617143228_replace_packages_index_on_project_id_and_status.rb @@ -0,0 +1,22 @@ +# frozen_string_literal: true + +class ReplacePackagesIndexOnProjectIdAndStatus < Gitlab::Database::Migration[2.0] + disable_ddl_transaction! + + NEW_INDEX_NAME = 'index_packages_packages_on_project_id_and_status_and_id' + OLD_INDEX_NAME = 'index_packages_packages_on_project_id_and_status' + + def up + add_concurrent_index :packages_packages, + [:project_id, :status, :id], + name: NEW_INDEX_NAME + remove_concurrent_index_by_name :packages_packages, OLD_INDEX_NAME + end + + def down + add_concurrent_index :packages_packages, + [:project_id, :status], + name: OLD_INDEX_NAME + remove_concurrent_index_by_name :packages_packages, NEW_INDEX_NAME + end +end diff --git a/db/schema_migrations/20220617142124 b/db/schema_migrations/20220617142124 new file mode 100644 index 00000000000000..c8fd06f2c1086a --- /dev/null +++ b/db/schema_migrations/20220617142124 @@ -0,0 +1 @@ +668404076e9cfc91817b8ae3ec995a69ec0db283153bbe497a81eb83c2188ceb \ No newline at end of file diff --git a/db/schema_migrations/20220617143228 b/db/schema_migrations/20220617143228 new file mode 100644 index 00000000000000..cb4ac555bc3cb5 --- /dev/null +++ b/db/schema_migrations/20220617143228 @@ -0,0 +1 @@ +547fc0071177395133497cbcec9a9d9ed058fe74f632f5e84d9a6416047503f2 \ No newline at end of file diff --git a/db/structure.sql b/db/structure.sql index 9ad896d801cdc9..0d01a57c40cb9f 100644 --- a/db/structure.sql +++ b/db/structure.sql @@ -27116,6 +27116,8 @@ CREATE INDEX idx_pkgs_debian_project_distribution_keys_on_distribution_id ON pac CREATE UNIQUE INDEX idx_pkgs_dep_links_on_pkg_id_dependency_id_dependency_type ON packages_dependency_links USING btree (package_id, dependency_id, dependency_type); +CREATE INDEX idx_pkgs_installable_package_files_on_package_id_id_file_name ON packages_package_files USING btree (package_id, id, file_name) WHERE (status = 0); + CREATE INDEX idx_proj_feat_usg_on_jira_dvcs_cloud_last_sync_at_and_proj_id ON project_feature_usages USING btree (jira_dvcs_cloud_last_sync_at, project_id) WHERE (jira_dvcs_cloud_last_sync_at IS NOT NULL); CREATE INDEX idx_proj_feat_usg_on_jira_dvcs_server_last_sync_at_and_proj_id ON project_feature_usages USING btree (jira_dvcs_server_last_sync_at, project_id) WHERE (jira_dvcs_server_last_sync_at IS NOT NULL); @@ -29096,7 +29098,7 @@ CREATE INDEX index_packages_packages_on_project_id_and_created_at ON packages_pa CREATE INDEX index_packages_packages_on_project_id_and_package_type ON packages_packages USING btree (project_id, package_type); -CREATE INDEX index_packages_packages_on_project_id_and_status ON packages_packages USING btree (project_id, status); +CREATE INDEX index_packages_packages_on_project_id_and_status_and_id ON packages_packages USING btree (project_id, status, id); CREATE INDEX index_packages_packages_on_project_id_and_version ON packages_packages USING btree (project_id, version); diff --git a/spec/models/packages/cleanup/policy_spec.rb b/spec/models/packages/cleanup/policy_spec.rb index c08ae4aa7e7309..9f4568932d234f 100644 --- a/spec/models/packages/cleanup/policy_spec.rb +++ b/spec/models/packages/cleanup/policy_spec.rb @@ -25,4 +25,16 @@ it { is_expected.to contain_exactly(active_policy) } end + + describe '#keep_n_duplicated_package_files_disabled?' do + subject { policy.keep_n_duplicated_package_files_disabled? } + + %w[all 1].each do |value| + context "with value set to #{value}" do + let(:policy) { build(:packages_cleanup_policy, keep_n_duplicated_package_files: value) } + + it { is_expected.to eq(value == 'all') } + end + end + end end diff --git a/spec/services/packages/cleanup/execute_policy_service_spec.rb b/spec/services/packages/cleanup/execute_policy_service_spec.rb new file mode 100644 index 00000000000000..93335c4a821a87 --- /dev/null +++ b/spec/services/packages/cleanup/execute_policy_service_spec.rb @@ -0,0 +1,163 @@ +# frozen_string_literal: true +require 'spec_helper' + +RSpec.describe Packages::Cleanup::ExecutePolicyService do + let_it_be(:project) { create(:project) } + let_it_be_with_reload(:policy) { create(:packages_cleanup_policy, project: project) } + + let(:service) { described_class.new(policy) } + + describe '#execute' do + subject(:execute) { service.execute } + + context 'with the keep_n_duplicated_files parameter' do + let_it_be(:package1) { create(:package, project: project) } + let_it_be(:package2) { create(:package, project: project) } + let_it_be(:package3) { create(:package, project: project) } + let_it_be(:package4) { create(:package, :pending_destruction, project: project) } + + let_it_be(:package_file1_1) { create(:package_file, package: package1, file_name: 'file_name1') } + let_it_be(:package_file1_2) { create(:package_file, package: package1, file_name: 'file_name1') } + let_it_be(:package_file1_3) { create(:package_file, package: package1, file_name: 'file_name1') } + + let_it_be(:package_file1_4) { create(:package_file, package: package1, file_name: 'file_name2') } + let_it_be(:package_file1_5) { create(:package_file, package: package1, file_name: 'file_name2') } + let_it_be(:package_file1_6) { create(:package_file, package: package1, file_name: 'file_name2') } + let_it_be(:package_file1_7) do + create(:package_file, :pending_destruction, package: package1, file_name: 'file_name2') + end + + let_it_be(:package_file2_1) { create(:package_file, package: package2, file_name: 'file_name1') } + let_it_be(:package_file2_2) { create(:package_file, package: package2, file_name: 'file_name1') } + let_it_be(:package_file2_3) { create(:package_file, package: package2, file_name: 'file_name1') } + let_it_be(:package_file2_4) { create(:package_file, package: package2, file_name: 'file_name1') } + + let_it_be(:package_file3_1) { create(:package_file, package: package3, file_name: 'file_name_test') } + + let_it_be(:package_file4_1) { create(:package_file, package: package4, file_name: 'file_name1') } + let_it_be(:package_file4_2) { create(:package_file, package: package4, file_name: 'file_name1') } + + let(:package_files_1) { package1.package_files.installable } + let(:package_files_2) { package2.package_files.installable } + let(:package_files_3) { package3.package_files.installable } + + context 'set to less than the total number of duplicated files' do + before do + # for each package file duplicate, we keep only the most recent one + policy.update!(keep_n_duplicated_package_files: '1') + end + + shared_examples 'keeping the most recent package files' do + let(:response_payload) do + { + counts: { + marked_package_files_total_count: 7, + unique_package_id_and_file_name_total_count: 3 + }, + timeout: false + } + end + + it 'only keeps the most recent package files' do + expect { execute }.to change { ::Packages::PackageFile.installable.count }.by(-7) + + expect(package_files_1).to contain_exactly(package_file1_3, package_file1_6) + expect(package_files_2).to contain_exactly(package_file2_4) + expect(package_files_3).to contain_exactly(package_file3_1) + + expect(execute).to be_success + expect(execute.message).to eq("Packages cleanup policy executed for project #{project.id}") + expect(execute.payload).to eq(response_payload) + end + end + + it_behaves_like 'keeping the most recent package files' + + context 'when the service needs to loop' do + before do + stub_const("#{described_class.name}::DUPLICATED_FILES_BATCH_SIZE", 2) + end + + it_behaves_like 'keeping the most recent package files' do + before do + expect(::Packages::MarkPackageFilesForDestructionService) + .to receive(:new).exactly(3).times.and_call_original + end + end + + context 'when a timeout is hit' do + let(:response_payload) do + { + counts: { + marked_package_files_total_count: 4, + unique_package_id_and_file_name_total_count: 3 + }, + timeout: true + } + end + + let(:service_timeout_response) do + ServiceResponse.error( + message: 'Timeout while marking package files as pending destruction', + payload: { marked_package_files_count: 0 } + ) + end + + before do + mock_service_timeout(on_iteration: 3) + end + + it 'keeps part of the most recent package files' do + expect { execute } + .to change { ::Packages::PackageFile.installable.count }.by(-4) + .and not_change { package_files_2.count } # untouched because of the timeout + .and not_change { package_files_3.count } # untouched because of the timeout + + expect(package_files_1).to contain_exactly(package_file1_3, package_file1_6) + expect(execute).to be_success + expect(execute.message).to eq("Packages cleanup policy executed for project #{project.id}") + expect(execute.payload).to eq(response_payload) + end + + def mock_service_timeout(on_iteration:) + execute_call_count = 1 + expect_next_instances_of(::Packages::MarkPackageFilesForDestructionService, 3) do |service| + expect(service).to receive(:execute).and_wrap_original do |m, *args| + # timeout if we are on the right iteration + if execute_call_count == on_iteration + service_timeout_response + else + execute_call_count += 1 + m.call(*args) + end + end + end + end + end + end + end + + context 'set to more than the total number of duplicated files' do + before do + # using the biggest value for keep_n_duplicated_package_files + policy.update!(keep_n_duplicated_package_files: '50') + end + + it 'keeps all package files' do + expect { execute }.not_to change { ::Packages::PackageFile.installable.count } + end + end + + context 'set to all' do + before do + policy.update!(keep_n_duplicated_package_files: 'all') + end + + it 'skips the policy' do + expect(::Packages::MarkPackageFilesForDestructionService).not_to receive(:new) + expect { execute }.not_to change { ::Packages::PackageFile.installable.count } + end + end + end + end +end diff --git a/spec/services/packages/mark_package_files_for_destruction_service_spec.rb b/spec/services/packages/mark_package_files_for_destruction_service_spec.rb index a836de1f7f6633..66534338003e30 100644 --- a/spec/services/packages/mark_package_files_for_destruction_service_spec.rb +++ b/spec/services/packages/mark_package_files_for_destruction_service_spec.rb @@ -6,9 +6,11 @@ let(:service) { described_class.new(package_files) } describe '#execute', :aggregate_failures do - subject { service.execute } + let(:batch_deadline) { nil } - shared_examples 'executing successfully' do + subject { service.execute(batch_deadline: batch_deadline) } + + shared_examples 'executing successfully' do |marked_package_files_count: 0| it 'marks package files for destruction' do expect { subject } .to change { ::Packages::PackageFile.pending_destruction.count }.by(package_files.size) @@ -17,6 +19,7 @@ it 'executes successfully' do expect(subject).to be_success expect(subject.message).to eq('Package files are now pending destruction') + expect(subject.payload).to eq(marked_package_files_count: marked_package_files_count) end end @@ -30,13 +33,49 @@ let_it_be(:package_file) { create(:package_file) } let_it_be(:package_files) { ::Packages::PackageFile.id_in(package_file.id) } - it_behaves_like 'executing successfully' + it_behaves_like 'executing successfully', marked_package_files_count: 1 end context 'with many package files' do let_it_be(:package_files) { ::Packages::PackageFile.id_in(create_list(:package_file, 3).map(&:id)) } - it_behaves_like 'executing successfully' + it_behaves_like 'executing successfully', marked_package_files_count: 3 + + context 'with a batch deadline' do + let_it_be(:batch_deadline) { 250.seconds.from_now } + + context 'when the deadline is not hit' do + before do + expect(Time.zone).to receive(:now).and_return(batch_deadline - 10.seconds) + end + + it_behaves_like 'executing successfully', marked_package_files_count: 3 + end + + context 'when the deadline is hit' do + it 'does not execute the batch loop' do + expect(Time.zone).to receive(:now).and_return(batch_deadline + 10.seconds) + expect { subject }.to not_change { ::Packages::PackageFile.pending_destruction.count } + expect(subject).to be_error + expect(subject.message).to eq('Timeout while marking package files as pending destruction') + expect(subject.payload).to eq(marked_package_files_count: 0) + end + end + end + + context 'when a batch size is defined' do + let_it_be(:batch_deadline) { 250.seconds.from_now } + + let(:batch_size) { 2 } + + subject { service.execute(batch_deadline: batch_deadline, batch_size: batch_size) } + + before do + expect(Time.zone).to receive(:now).twice.and_call_original + end + + it_behaves_like 'executing successfully', marked_package_files_count: 3 + end end context 'with an error during the update' do -- GitLab