diff --git a/app/services/import/github/gists_import_service.rb b/app/services/import/github/gists_import_service.rb new file mode 100644 index 0000000000000000000000000000000000000000..df1bbe306e7820c4881d79f916f86ea4a900942d --- /dev/null +++ b/app/services/import/github/gists_import_service.rb @@ -0,0 +1,34 @@ +# frozen_string_literal: true + +module Import + module Github + class GistsImportService < ::BaseService + def initialize(user, params) + @current_user = user + @params = params + end + + def execute + return error('Import already in progress', 422) if import_status.started? + + start_import + success + end + + private + + def import_status + @import_status ||= Gitlab::GithubGistsImport::Status.new(current_user.id) + end + + def encrypted_token + Gitlab::CryptoHelper.aes256_gcm_encrypt(params[:github_access_token]) + end + + def start_import + Gitlab::GithubGistsImport::StartImportWorker.perform_async(current_user.id, encrypted_token) + import_status.start! + end + end + end +end diff --git a/app/workers/all_queues.yml b/app/workers/all_queues.yml index bba5ef370519a0012e9049aaa5c4b77bf2f73f2f..9540c33caba590e9494bf912415a80b69c9b64d3 100644 --- a/app/workers/all_queues.yml +++ b/app/workers/all_queues.yml @@ -1065,6 +1065,15 @@ :weight: 1 :idempotent: false :tags: [] +- :name: github_gists_importer:github_gists_import_finish_import + :worker_name: Gitlab::GithubGistsImport::FinishImportWorker + :feature_category: :importers + :has_external_dependencies: false + :urgency: :low + :resource_boundary: :unknown + :weight: 1 + :idempotent: true + :tags: [] - :name: github_gists_importer:github_gists_import_import_gist :worker_name: Gitlab::GithubGistsImport::ImportGistWorker :feature_category: :importers @@ -1074,6 +1083,15 @@ :weight: 1 :idempotent: false :tags: [] +- :name: github_gists_importer:github_gists_import_start_import + :worker_name: Gitlab::GithubGistsImport::StartImportWorker + :feature_category: :importers + :has_external_dependencies: true + :urgency: :low + :resource_boundary: :unknown + :weight: 1 + :idempotent: false + :tags: [] - :name: github_importer:github_import_attachments_import_issue :worker_name: Gitlab::GithubImport::Attachments::ImportIssueWorker :feature_category: :importers diff --git a/app/workers/gitlab/github_gists_import/finish_import_worker.rb b/app/workers/gitlab/github_gists_import/finish_import_worker.rb new file mode 100644 index 0000000000000000000000000000000000000000..1989b6314ea77a523f61af8ea3c6c88efc491cde --- /dev/null +++ b/app/workers/gitlab/github_gists_import/finish_import_worker.rb @@ -0,0 +1,46 @@ +# frozen_string_literal: true + +module Gitlab + module GithubGistsImport + class FinishImportWorker + include ApplicationWorker + + data_consistency :always + queue_namespace :github_gists_importer + feature_category :importers + idempotent! + + sidekiq_options dead: false, retry: 5 + + sidekiq_retries_exhausted do |msg, _| + Gitlab::GithubGistsImport::Status.new(msg['args'][0]).fail! + end + + INTERVAL = 30.seconds.to_i + BLOCKING_WAIT_TIME = 5 + + def perform(user_id, waiter_key, remaining) + waiter = wait_for_jobs(waiter_key, remaining) + + if waiter.nil? + Gitlab::GithubGistsImport::Status.new(user_id).finish! + + Gitlab::GithubImport::Logger.info(user_id: user_id, message: 'GitHub Gists import finished') + else + self.class.perform_in(INTERVAL, user_id, waiter.key, waiter.jobs_remaining) + end + end + + private + + def wait_for_jobs(key, remaining) + waiter = JobWaiter.new(remaining, key) + waiter.wait(BLOCKING_WAIT_TIME) + + return if waiter.jobs_remaining == 0 + + waiter + end + end + end +end diff --git a/app/workers/gitlab/github_gists_import/start_import_worker.rb b/app/workers/gitlab/github_gists_import/start_import_worker.rb new file mode 100644 index 0000000000000000000000000000000000000000..33c9161171985e52e656c2686a313cdc6719ee10 --- /dev/null +++ b/app/workers/gitlab/github_gists_import/start_import_worker.rb @@ -0,0 +1,64 @@ +# frozen_string_literal: true + +module Gitlab + module GithubGistsImport + class StartImportWorker # rubocop:disable Scalability/IdempotentWorker + include ApplicationWorker + + data_consistency :always + queue_namespace :github_gists_importer + feature_category :importers + + sidekiq_options dead: false, retry: 5 + + worker_has_external_dependencies! + + sidekiq_retries_exhausted do |msg, _| + Gitlab::GithubGistsImport::Status.new(msg['args'][0]).fail! + + user = User.find(msg['args'][0]) + Gitlab::GithubImport::PageCounter.new(user, :gists, 'github-gists-importer').expire! + end + + def perform(user_id, encrypted_token) + logger.info(structured_payload(user_id: user_id, message: 'starting importer')) + + user = User.find(user_id) + decrypted_token = Gitlab::CryptoHelper.aes256_gcm_decrypt(encrypted_token) + result = Gitlab::GithubGistsImport::Importer::GistsImporter.new(user, decrypted_token).execute + + if result.success? + schedule_finish_worker(user_id, result.waiter) + elsif result.next_attempt_in + schedule_next_attempt(result.next_attempt_in, user_id, encrypted_token) + else + log_error_and_raise!(user_id, result.error) + end + end + + private + + def schedule_finish_worker(user_id, waiter) + logger.info(structured_payload(user_id: user_id, message: 'importer finished')) + + Gitlab::GithubGistsImport::FinishImportWorker.perform_async(user_id, waiter.key, waiter.jobs_remaining) + end + + def schedule_next_attempt(next_attempt_in, user_id, encrypted_token) + logger.info(structured_payload(user_id: user_id, message: 'rate limit reached')) + + self.class.perform_in(next_attempt_in, user_id, encrypted_token) + end + + def log_error_and_raise!(user_id, error) + logger.error(structured_payload(user_id: user_id, message: 'import failed', 'error.message': error.message)) + + raise(error) + end + + def logger + Gitlab::GithubImport::Logger + end + end + end +end diff --git a/lib/gitlab/github_gists_import/status.rb b/lib/gitlab/github_gists_import/status.rb new file mode 100644 index 0000000000000000000000000000000000000000..e997eb0bf885ebd5dadcb48c6008bb0373a9c737 --- /dev/null +++ b/lib/gitlab/github_gists_import/status.rb @@ -0,0 +1,43 @@ +# frozen_string_literal: true + +module Gitlab + module GithubGistsImport + class Status + IMPORT_STATUS_KEY = 'gitlab:github-gists-import:%{user_id}' + EXPIRATION_TIME = 24.hours + + def initialize(user_id) + @user_id = user_id + end + + def start! + change_status('started') + end + + def fail! + change_status('failed') + end + + def finish! + change_status('finished') + end + + def started? + Gitlab::Redis::SharedState.with { |redis| redis.get(import_status_key) == 'started' } + end + + private + + def change_status(status_name) + Gitlab::Redis::SharedState.with do |redis| + redis.set(import_status_key, status_name) + redis.expire(import_status_key, EXPIRATION_TIME) unless status_name == 'started' + end + end + + def import_status_key + format(IMPORT_STATUS_KEY, user_id: @user_id) + end + end + end +end diff --git a/spec/lib/gitlab/github_gists_import/status_spec.rb b/spec/lib/gitlab/github_gists_import/status_spec.rb new file mode 100644 index 0000000000000000000000000000000000000000..4cbbbd430eb2d5a663eceecd8855aad08435a6b4 --- /dev/null +++ b/spec/lib/gitlab/github_gists_import/status_spec.rb @@ -0,0 +1,50 @@ +# frozen_string_literal: true + +require 'spec_helper' + +RSpec.describe Gitlab::GithubGistsImport::Status, :clean_gitlab_redis_cache, feature_category: :importer do + subject(:import_status) { described_class.new(user.id) } + + let_it_be(:user) { create(:user) } + let(:key) { "gitlab:github-gists-import:#{user.id}" } + + describe '#start!' do + it 'expires the key' do + import_status.start! + + Gitlab::Redis::SharedState.with do |redis| + expect(redis.get(key)).to eq('started') + end + end + end + + describe '#fail!' do + it 'sets failed status' do + import_status.fail! + + Gitlab::Redis::SharedState.with do |redis| + expect(redis.get(key)).to eq('failed') + end + end + end + + describe '#finish!' do + it 'sets finished status' do + import_status.finish! + + Gitlab::Redis::SharedState.with do |redis| + expect(redis.get(key)).to eq('finished') + end + end + end + + describe '#started?' do + before do + Gitlab::Redis::SharedState.with { |redis| redis.set(key, 'started') } + end + + it 'checks if status is started' do + expect(import_status.started?).to eq(true) + end + end +end diff --git a/spec/services/import/github/gists_import_service_spec.rb b/spec/services/import/github/gists_import_service_spec.rb new file mode 100644 index 0000000000000000000000000000000000000000..c5d73e6479d3d37394f7e4a267039b96f96e9b1e --- /dev/null +++ b/spec/services/import/github/gists_import_service_spec.rb @@ -0,0 +1,47 @@ +# frozen_string_literal: true + +require 'spec_helper' + +RSpec.describe Import::Github::GistsImportService, feature_category: :importer do + subject(:import) { described_class.new(user, params) } + + let_it_be(:user) { create(:user) } + let(:params) { { github_access_token: 'token' } } + let(:import_status) { instance_double('Gitlab::GithubGistsImport::Status') } + + describe '#execute', :aggregate_failures do + before do + allow(Gitlab::GithubGistsImport::Status).to receive(:new).and_return(import_status) + end + + context 'when import in progress' do + let(:expected_result) do + { + status: :error, + http_status: 422, + message: 'Import already in progress' + } + end + + it 'returns error' do + expect(import_status).to receive(:started?).and_return(true) + expect(import.execute).to eq(expected_result) + end + end + + context 'when import was not started' do + it 'returns success' do + encrypted_token = Gitlab::CryptoHelper.aes256_gcm_encrypt(params[:github_access_token]) + expect(import_status).to receive(:started?).and_return(false) + expect(Gitlab::CryptoHelper) + .to receive(:aes256_gcm_encrypt).with(params[:github_access_token]) + .and_return(encrypted_token) + expect(Gitlab::GithubGistsImport::StartImportWorker) + .to receive(:perform_async).with(user.id, encrypted_token) + expect(import_status).to receive(:start!) + + expect(import.execute).to eq({ status: :success }) + end + end + end +end diff --git a/spec/workers/every_sidekiq_worker_spec.rb b/spec/workers/every_sidekiq_worker_spec.rb index 79b0cbf254b915d02c281db16c86bc011f2045fa..788f5d8222c8d7590601691cb301ba8a89e21211 100644 --- a/spec/workers/every_sidekiq_worker_spec.rb +++ b/spec/workers/every_sidekiq_worker_spec.rb @@ -290,6 +290,8 @@ 'Gitlab::GithubImport::Stage::ImportPullRequestsWorker' => 5, 'Gitlab::GithubImport::Stage::ImportRepositoryWorker' => 5, 'Gitlab::GithubGistsImport::ImportGistWorker' => 5, + 'Gitlab::GithubGistsImport::StartImportWorker' => 5, + 'Gitlab::GithubGistsImport::FinishImportWorker' => 5, 'Gitlab::JiraImport::AdvanceStageWorker' => 5, 'Gitlab::JiraImport::ImportIssueWorker' => 5, 'Gitlab::JiraImport::Stage::FinishImportWorker' => 5, diff --git a/spec/workers/gitlab/github_gists_import/finish_import_worker_spec.rb b/spec/workers/gitlab/github_gists_import/finish_import_worker_spec.rb new file mode 100644 index 0000000000000000000000000000000000000000..c4c19f2f9c558c54a72862360615efa92ad8cce7 --- /dev/null +++ b/spec/workers/gitlab/github_gists_import/finish_import_worker_spec.rb @@ -0,0 +1,51 @@ +# frozen_string_literal: true + +require 'spec_helper' + +RSpec.describe Gitlab::GithubGistsImport::FinishImportWorker, feature_category: :importer do + subject(:worker) { described_class.new } + + let_it_be(:user) { create(:user) } + + describe '#perform', :aggregate_failures do + context 'when there are no remaining jobs' do + it 'marks import status as finished' do + waiter = instance_double(Gitlab::JobWaiter, key: :key, jobs_remaining: 0) + expect(Gitlab::JobWaiter).to receive(:new).and_return(waiter) + expect(waiter).to receive(:wait).with(described_class::BLOCKING_WAIT_TIME) + expect_next_instance_of(Gitlab::GithubGistsImport::Status) do |status| + expect(status).to receive(:finish!) + end + expect(Gitlab::GithubImport::Logger) + .to receive(:info) + .with(user_id: user.id, message: 'GitHub Gists import finished') + + worker.perform(user.id, waiter.key, waiter.jobs_remaining) + end + end + + context 'when there are remaining jobs' do + it 'reschedules the worker' do + waiter = instance_double(Gitlab::JobWaiter, key: :key, jobs_remaining: 2) + expect(Gitlab::JobWaiter).to receive(:new).and_return(waiter) + expect(waiter).to receive(:wait).with(described_class::BLOCKING_WAIT_TIME) + expect(described_class).to receive(:perform_in) + .with(described_class::INTERVAL, user.id, waiter.key, waiter.jobs_remaining) + + worker.perform(user.id, waiter.key, waiter.jobs_remaining) + end + end + end + + describe '.sidekiq_retries_exhausted' do + it 'sets status to failed' do + job = { 'args' => [user.id, 'some_key', '1'], 'jid' => '123' } + + expect_next_instance_of(Gitlab::GithubGistsImport::Status) do |status| + expect(status).to receive(:fail!) + end + + described_class.sidekiq_retries_exhausted_block.call(job) + end + end +end diff --git a/spec/workers/gitlab/github_gists_import/start_import_worker_spec.rb b/spec/workers/gitlab/github_gists_import/start_import_worker_spec.rb new file mode 100644 index 0000000000000000000000000000000000000000..523b7463a9d852cb2a1a5a8933b9d927d13ed428 --- /dev/null +++ b/spec/workers/gitlab/github_gists_import/start_import_worker_spec.rb @@ -0,0 +1,110 @@ +# frozen_string_literal: true + +require 'spec_helper' + +RSpec.describe Gitlab::GithubGistsImport::StartImportWorker, feature_category: :importer do + subject(:worker) { described_class.new } + + let_it_be(:user) { create(:user) } + let(:token) { Gitlab::CryptoHelper.aes256_gcm_encrypt('token') } + let(:importer) { instance_double(Gitlab::GithubGistsImport::Importer::GistsImporter) } + let(:waiter) { instance_double(Gitlab::JobWaiter, key: :key, jobs_remaining: 1) } + let(:importer_context) { Struct.new(:success?, :error, :waiter, :next_attempt_in, keyword_init: true) } + let(:log_attributes) do + { + 'user_id' => user.id, + 'class' => described_class.name, + 'correlation_id' => 'new-correlation-id', + 'jid' => nil, + 'job_status' => 'running', + 'queue' => 'github_gists_importer:github_gists_import_start_import' + } + end + + describe '#perform', :aggregate_failures do + before do + allow(Gitlab::GithubImport::Logger) + .to receive(:info) + .with(log_attributes.merge('message' => 'starting importer')) + + allow(Gitlab::ApplicationContext).to receive(:current).and_return('correlation_id' => 'new-correlation-id') + allow(described_class).to receive(:queue).and_return('github_gists_importer:github_gists_import_start_import') + end + + context 'when import was successfull' do + it 'imports all the gists' do + expect(Gitlab::CryptoHelper) + .to receive(:aes256_gcm_decrypt) + .with(token) + .and_call_original + + expect(Gitlab::GithubGistsImport::Importer::GistsImporter) + .to receive(:new) + .with(user, 'token') + .and_return(importer) + + expect(importer) + .to receive(:execute) + .and_return(importer_context.new(success?: true, waiter: waiter)) + + expect(Gitlab::GithubGistsImport::FinishImportWorker) + .to receive(:perform_async) + .with(user.id, waiter.key, waiter.jobs_remaining) + + expect(Gitlab::GithubImport::Logger) + .to receive(:info) + .with(log_attributes.merge('message' => 'importer finished')) + + worker.perform(user.id, token) + end + end + + context 'when importer returns an error' do + it 'raises an error' do + exception = StandardError.new('_some_error_') + importer_result = importer_context.new(success?: false, error: exception) + + expect_next_instance_of(Gitlab::GithubGistsImport::Importer::GistsImporter) do |importer| + expect(importer).to receive(:execute).and_return(importer_result) + end + + expect(Gitlab::GithubImport::Logger) + .to receive(:error) + .with(log_attributes.merge('message' => 'import failed', 'error.message' => exception.message)) + + expect { worker.perform(user.id, token) }.to raise_error(StandardError) + end + end + + context 'when rate limit is reached' do + it 'reschedules worker' do + exception = Gitlab::GithubImport::RateLimitError.new + importer_result = importer_context.new(success?: false, error: exception, next_attempt_in: 5) + + expect_next_instance_of(Gitlab::GithubGistsImport::Importer::GistsImporter) do |importer| + expect(importer).to receive(:execute).and_return(importer_result) + end + + expect(Gitlab::GithubImport::Logger) + .to receive(:info) + .with(log_attributes.merge('message' => 'rate limit reached')) + + expect(described_class).to receive(:perform_in).with(5, user.id, token) + + worker.perform(user.id, token) + end + end + end + + describe '.sidekiq_retries_exhausted' do + it 'sets status to failed' do + job = { 'args' => [user.id, token], 'jid' => '123' } + + expect_next_instance_of(Gitlab::GithubGistsImport::Status) do |status| + expect(status).to receive(:fail!) + end + + described_class.sidekiq_retries_exhausted_block.call(job) + end + end +end