diff --git a/config/feature_flags/wip/bitbucket_import_resumable_worker.yml b/config/feature_flags/wip/bitbucket_import_resumable_worker.yml new file mode 100644 index 0000000000000000000000000000000000000000..ae411b7ae0b99daac780c02d2a79e998cc935785 --- /dev/null +++ b/config/feature_flags/wip/bitbucket_import_resumable_worker.yml @@ -0,0 +1,9 @@ +--- +name: bitbucket_import_resumable_worker +feature_issue_url: https://gitlab.com/gitlab-org/gitlab/-/issues/466231 +introduced_by_url: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/156797 +rollout_issue_url: +milestone: '17.2' +group: group::import and integrate +type: wip +default_enabled: false diff --git a/lib/bitbucket/client.rb b/lib/bitbucket/client.rb index c97d16730578d9d47b5d3bf9521cc42b345dd57f..c031851af3d7be1e13c67e9523f8412c2935ae2d 100644 --- a/lib/bitbucket/client.rb +++ b/lib/bitbucket/client.rb @@ -29,12 +29,45 @@ class Client values.links values.summary values.reviewers + next ].freeze def initialize(options = {}) @connection = Connection.new(options) end + # Fetches data from the Bitbucket API and yields a Page object for every page + # of data, without loading all of them into memory. + # + # method - The method name used for getting the data. + # representation_type - The representation type name used to wrap the result + # args - Arguments to pass to the method. + def each_page(method, representation_type, *args) + options = + if args.last.is_a?(Hash) + args.last + else + {} + end + + loop do + parsed_response = fetch_data(method, *args) + object = Page.new(parsed_response, representation_type) + + yield object + + break unless object.next? + + options[:next_url] = object.next + + if args.last.is_a?(Hash) + args[-1] = options + else + args.push(options) + end + end + end + def last_issue(repo) parsed_response = connection.get("/repositories/#{repo}/issues?pagelen=1&sort=-created_on&state=ALL") Bitbucket::Representation::Issue.new(parsed_response['values'].first) @@ -50,9 +83,15 @@ def issue_comments(repo, issue_id) get_collection(path, :comment) end - def pull_requests(repo) + def pull_requests(repo, options = {}) path = "/repositories/#{repo}/pullrequests?state=ALL&sort=created_on&fields=#{pull_request_values}" - get_collection(path, :pull_request) + + if options[:raw] + path = options[:next_url] if options[:next_url] + connection.get(path) + else + get_collection(path, :pull_request) + end end def pull_request_comments(repo, pull_request) @@ -91,6 +130,14 @@ def users(workspace_key, page_number: nil, limit: nil) private + def fetch_data(method, *args) + case method + when :pull_requests then pull_requests(*args) + else + raise ArgumentError, "Unknown data method #{method}" + end + end + def get_collection(path, type, page_number: nil, limit: nil) paginator = Paginator.new(connection, path, type, page_number: page_number, limit: limit) Collection.new(paginator) diff --git a/lib/gitlab/bitbucket_import/importers/pull_requests_importer.rb b/lib/gitlab/bitbucket_import/importers/pull_requests_importer.rb index 99bf434a396ac53bd780eed84a665386bc0fb0d0..c9de0d3384153f4ab7fcf7de3e4405c2cdbc24e5 100644 --- a/lib/gitlab/bitbucket_import/importers/pull_requests_importer.rb +++ b/lib/gitlab/bitbucket_import/importers/pull_requests_importer.rb @@ -7,6 +7,31 @@ class PullRequestsImporter include ParallelScheduling def execute + bitbucket_import_resumable_worker = + project.import_data&.data&.dig('bitbucket_import_resumable_worker') + + if bitbucket_import_resumable_worker + resumable_execute + else + non_resumable_execute + end + end + + private + + def resumable_execute + log_info(import_stage: 'import_pull_requests', message: 'importing pull requests') + + each_object_to_import do |object| + job_delay = calculate_job_delay(job_waiter.jobs_remaining) + + sidekiq_worker_class.perform_in(job_delay, project.id, object.to_hash, job_waiter.key) + end + + job_waiter + end + + def non_resumable_execute log_info(import_stage: 'import_pull_requests', message: 'importing pull requests') pull_requests = client.pull_requests(project.import_source) @@ -29,8 +54,6 @@ def execute job_waiter end - private - def sidekiq_worker_class ImportPullRequestWorker end @@ -39,8 +62,22 @@ def collection_method :pull_requests end + def collection_options + { raw: true } + end + + def representation_type + :pull_request + end + def id_for_already_enqueued_cache(object) - object.iid + if object.is_a?(Hash) + # used for `resumable_execute` + object[:iid] + else + # used for `non_resumable_execute` + object.iid + end end # To avoid overloading Gitaly, we use a smaller limit for pull requests than the one defined in the diff --git a/lib/gitlab/bitbucket_import/parallel_scheduling.rb b/lib/gitlab/bitbucket_import/parallel_scheduling.rb index 1cc5855be76a3f9828f1fd467cbf53484e98539a..c4703c71da9a0ccd83f2a836da92c79a4e3f6e17 100644 --- a/lib/gitlab/bitbucket_import/parallel_scheduling.rb +++ b/lib/gitlab/bitbucket_import/parallel_scheduling.rb @@ -6,7 +6,8 @@ module ParallelScheduling include Loggable include ErrorTracking - attr_reader :project, :already_enqueued_cache_key, :job_waiter_cache_key + attr_reader :project, :already_enqueued_cache_key, :job_waiter_cache_key, :job_waiter_remaining_cache_key, + :page_keyset # The base cache key to use for tracking already enqueued objects. ALREADY_ENQUEUED_CACHE_KEY = @@ -16,6 +17,10 @@ module ParallelScheduling JOB_WAITER_CACHE_KEY = 'bitbucket-importer/job-waiter/%{project}/%{collection}' + # The base cache key to use for storing job waiter remaining jobs + JOB_WAITER_REMAINING_CACHE_KEY = + 'bitbucket-importer/job-waiter-remaining/%{project}/%{collection}' + # project - An instance of `Project`. def initialize(project) @project = project @@ -24,10 +29,45 @@ def initialize(project) format(ALREADY_ENQUEUED_CACHE_KEY, project: project.id, collection: collection_method) @job_waiter_cache_key = format(JOB_WAITER_CACHE_KEY, project: project.id, collection: collection_method) + @job_waiter_remaining_cache_key = format(JOB_WAITER_REMAINING_CACHE_KEY, project: project.id, + collection: collection_method) + @page_keyset = Gitlab::Import::PageKeyset.new(project, collection_method, 'bitbucket-importer') + end + + # The method that will be called for traversing through all the objects to + # import, yielding them to the supplied block. + def each_object_to_import + repo = project.import_source + + options = collection_options.merge(next_url: page_keyset.current) + + client.each_page(collection_method, representation_type, repo, options) do |page| + page.items.each do |object| + job_waiter.jobs_remaining = Gitlab::Cache::Import::Caching.increment(job_waiter_remaining_cache_key) + + object = object.to_hash + + next if already_enqueued?(object) + + yield object + + # We mark the object as imported immediately so we don't end up + # scheduling it multiple times. + mark_as_enqueued(object) + end + + page_keyset.set(page.next) if page.next? + end end private + # Any options to be passed to the method used for retrieving the data to + # import. + def collection_options + {} + end + def client @client ||= Bitbucket::Client.new(project.import_data.credentials) end @@ -51,12 +91,18 @@ def collection_method raise NotImplementedError end + # The name of the method to call to retrieve the representation object + def representation_type + raise NotImplementedError + end + def job_waiter @job_waiter ||= begin key = Gitlab::Cache::Import::Caching.read(job_waiter_cache_key) key ||= Gitlab::Cache::Import::Caching.write(job_waiter_cache_key, JobWaiter.generate_key) + jobs_remaining = Gitlab::Cache::Import::Caching.read(job_waiter_remaining_cache_key).to_i || 0 - JobWaiter.new(0, key) + JobWaiter.new(jobs_remaining, key) end end diff --git a/lib/gitlab/bitbucket_import/project_creator.rb b/lib/gitlab/bitbucket_import/project_creator.rb index 2a5c6951073468b3022debd616827debe6085a61..8c7caa70bf06ffc46c1d91eeba6f128bcf95dc72 100644 --- a/lib/gitlab/bitbucket_import/project_creator.rb +++ b/lib/gitlab/bitbucket_import/project_creator.rb @@ -14,6 +14,9 @@ def initialize(repo, name, namespace, current_user, credentials) end def execute + bitbucket_import_resumable_worker = + Feature.enabled?(:bitbucket_import_resumable_worker, current_user) + ::Projects::CreateService.new( current_user, name: name, @@ -25,7 +28,12 @@ def execute import_type: 'bitbucket', import_source: repo.full_name, import_url: clone_url, - import_data: { credentials: credentials }, + import_data: { + credentials: credentials, + data: { + bitbucket_import_resumable_worker: bitbucket_import_resumable_worker + } + }, skip_wiki: skip_wiki ).execute end diff --git a/lib/gitlab/import/page_keyset.rb b/lib/gitlab/import/page_keyset.rb new file mode 100644 index 0000000000000000000000000000000000000000..3ef193a2020ac32a4c746fe7820133631fd6b33f --- /dev/null +++ b/lib/gitlab/import/page_keyset.rb @@ -0,0 +1,41 @@ +# frozen_string_literal: true + +module Gitlab + module Import + # PageKeyset can be used to keep track of the last imported page of a + # collection, allowing workers to resume where they left off in the event of + # an error. + class PageKeyset + attr_reader :cache_key + + # The base cache key to use for storing the last key. + CACHE_KEY = '%{import_type}/page-keyset/%{object}/%{collection}' + + def initialize(object, collection, import_type) + @cache_key = format(CACHE_KEY, import_type: import_type, object: object.id, collection: collection) + end + + # Set the key to the given value. + # + # @param value [String] + # @return [String] + def set(value) + Gitlab::Cache::Import::Caching.write(cache_key, value) + end + + # Get the current value from the cache + # + # @return [String] + def current + Gitlab::Cache::Import::Caching.read(cache_key) + end + + # Expire the key + # + # @return [Boolean] + def expire! + Gitlab::Cache::Import::Caching.expire(cache_key, 0) + end + end + end +end diff --git a/spec/lib/bitbucket/client_spec.rb b/spec/lib/bitbucket/client_spec.rb index 28724bd53ae1c20570817df343648e139431afaf..a485ca5455827f4b574dc18e45a92958d9be5445 100644 --- a/spec/lib/bitbucket/client_spec.rb +++ b/spec/lib/bitbucket/client_spec.rb @@ -14,6 +14,76 @@ subject(:client) { described_class.new(options) } + describe '#each_page' do + let_it_be(:item1) do + { 'username' => 'Ben' } + end + + let_it_be(:item2) do + { 'username' => 'Affleck' } + end + + let_it_be(:item3) do + { 'username' => 'Jane' } + end + + let_it_be(:response1) do + { 'values' => [item1], 'next' => 'https://example.com/next' } + end + + let_it_be(:response2) do + { 'values' => [item2], 'next' => 'https://example.com/next2' } + end + + let_it_be(:response3) do + { 'values' => [item3], 'next' => nil } + end + + before do + allow(client) + .to receive(:pull_requests) + .with('repo') + .and_return(response1) + + allow(client) + .to receive(:pull_requests) + .with('repo', { next_url: 'https://example.com/next' }) + .and_return(response2) + + allow(client) + .to receive(:pull_requests) + .with('repo', { next_url: 'https://example.com/next2' }) + .and_return(response3) + end + + it 'yields every retrieved page to the supplied block' do + pages = [] + + client.each_page(:pull_requests, :pull_request, 'repo') { |page| pages << page } + + expect(pages[0]).to be_an_instance_of(Bitbucket::Page) + + expect(pages[0].items.count).to eq(1) + expect(pages[0].items.first.raw).to eq(item1) + expect(pages[0].attrs[:next]).to eq('https://example.com/next') + + expect(pages[1].items.count).to eq(1) + expect(pages[1].items.first.raw).to eq(item2) + expect(pages[1].attrs[:next]).to eq('https://example.com/next2') + + expect(pages[2].items.count).to eq(1) + expect(pages[2].items.first.raw).to eq(item3) + expect(pages[2].attrs[:next]).to eq(nil) + end + + context 'when fetch_data not defined' do + it 'raises argument error' do + expect { client.each_page(:foo, :pull_request, 'repo') } + .to raise_error(ArgumentError, 'Unknown data method foo') + end + end + end + describe '#last_issue' do let(:url) { "#{root_url}/repositories/#{repo}/issues?pagelen=1&sort=-created_on&state=ALL" } @@ -59,6 +129,18 @@ client.pull_requests(repo) end + + context 'with options raw' do + let(:url) { "#{root_url}#{path}" } + + it 'returns raw result' do + stub_request(:get, url).to_return(status: 200, headers: headers, body: '{}') + + client.pull_requests(repo, raw: true) + + expect(WebMock).to have_requested(:get, url) + end + end end describe '#pull_request_comments' do diff --git a/spec/lib/gitlab/bitbucket_import/importers/pull_requests_importer_spec.rb b/spec/lib/gitlab/bitbucket_import/importers/pull_requests_importer_spec.rb index 32e3c3e3d7d1a3d6c01d35f6700f8c12aae67492..cb292a7d6f3671364fba7c888c97ed05bda177c8 100644 --- a/spec/lib/gitlab/bitbucket_import/importers/pull_requests_importer_spec.rb +++ b/spec/lib/gitlab/bitbucket_import/importers/pull_requests_importer_spec.rb @@ -3,29 +3,20 @@ require 'spec_helper' RSpec.describe Gitlab::BitbucketImport::Importers::PullRequestsImporter, :clean_gitlab_redis_shared_state, feature_category: :importers do - let_it_be(:project) do - create(:project, :import_started, - import_data_attributes: { - data: { 'project_key' => 'key', 'repo_slug' => 'slug' }, - credentials: { 'base_uri' => 'http://bitbucket.org/', 'user' => 'bitbucket', 'password' => 'password' } - } - ) - end - subject(:importer) { described_class.new(project) } - describe '#execute' do - before do - allow_next_instance_of(Bitbucket::Client) do |client| - allow(client).to receive(:pull_requests).and_return( - [ - Bitbucket::Representation::PullRequest.new({ 'id' => 1, 'state' => 'OPENED' }), - Bitbucket::Representation::PullRequest.new({ 'id' => 2, 'state' => 'DECLINED' }), - Bitbucket::Representation::PullRequest.new({ 'id' => 3, 'state' => 'MERGED' }) - ], - [] - ) - end + shared_examples 'import bitbucket PullRequestsImporter' do |bitbucket_import_resumable_worker| + let_it_be(:project) do + create(:project, :import_started, + import_data_attributes: { + data: { + 'project_key' => 'key', + 'repo_slug' => 'slug', + 'bitbucket_import_resumable_worker' => bitbucket_import_resumable_worker + }, + credentials: { 'base_uri' => 'http://bitbucket.org/', 'user' => 'bitbucket', 'password' => 'password' } + } + ) end it 'imports each pull request in parallel' do @@ -39,20 +30,6 @@ .to match_array(%w[1 2 3]) end - context 'when the client raises an error' do - before do - allow_next_instance_of(Bitbucket::Client) do |client| - allow(client).to receive(:pull_requests).and_raise(StandardError) - end - end - - it 'tracks the failure and does not fail' do - expect(Gitlab::Import::ImportFailureService).to receive(:track).once - - expect(importer.execute).to be_a(Gitlab::JobWaiter) - end - end - context 'when pull request was already enqueued' do before do Gitlab::Cache::Import::Caching.set_add(importer.already_enqueued_cache_key, 1) @@ -68,4 +45,69 @@ end end end + + describe '#resumable_execute' do + before do + allow_next_instance_of(Bitbucket::Client) do |client| + page = instance_double('Bitbucket::Page', attrs: [], items: [ + Bitbucket::Representation::PullRequest.new({ 'id' => 1, 'state' => 'OPENED' }), + Bitbucket::Representation::PullRequest.new({ 'id' => 2, 'state' => 'DECLINED' }), + Bitbucket::Representation::PullRequest.new({ 'id' => 3, 'state' => 'MERGED' }) + ]) + + allow(client).to receive(:each_page).and_yield(page) + allow(page).to receive(:next?).and_return(true) + allow(page).to receive(:next).and_return('https://example.com/next') + end + end + + it_behaves_like 'import bitbucket PullRequestsImporter', true do + context 'when the client raises an error' do + before do + allow_next_instance_of(Bitbucket::Client) do |client| + allow(client).to receive(:pull_requests).and_raise(StandardError.new('error fetching PRs')) + end + end + + it 'raises the error' do + expect { importer.execute }.to raise_error(StandardError, 'error fetching PRs') + end + end + end + end + + describe '#non_resumable_execute' do + before do + allow_next_instance_of(Bitbucket::Client) do |client| + allow(client).to receive(:pull_requests).and_return( + [ + Bitbucket::Representation::PullRequest.new({ 'id' => 1, 'state' => 'OPENED' }), + Bitbucket::Representation::PullRequest.new({ 'id' => 2, 'state' => 'DECLINED' }), + Bitbucket::Representation::PullRequest.new({ 'id' => 3, 'state' => 'MERGED' }) + ], + [] + ) + end + end + + it_behaves_like 'import bitbucket PullRequestsImporter', false do + context 'when the client raises an error' do + let(:exception) { StandardError.new('error fetching PRs') } + + before do + allow_next_instance_of(Bitbucket::Client) do |client| + allow(client).to receive(:pull_requests).and_raise(exception) + end + end + + it 'tracks the failure and does not fail' do + expect(Gitlab::Import::ImportFailureService).to receive(:track) + .once + .with(a_hash_including(exception: exception)) + + expect(importer.execute).to be_a(Gitlab::JobWaiter) + end + end + end + end end diff --git a/spec/lib/gitlab/bitbucket_import/parallel_scheduling_spec.rb b/spec/lib/gitlab/bitbucket_import/parallel_scheduling_spec.rb index 269ff6631e636bef868744e653164ba335f848fb..b3eb591de48b41cf5458b8730ac853d6982ea1c8 100644 --- a/spec/lib/gitlab/bitbucket_import/parallel_scheduling_spec.rb +++ b/spec/lib/gitlab/bitbucket_import/parallel_scheduling_spec.rb @@ -3,19 +3,29 @@ require 'spec_helper' RSpec.describe Gitlab::BitbucketImport::ParallelScheduling, feature_category: :importers do - let_it_be(:project) { build(:project) } + let_it_be(:project) do + create(:project, :import_started, import_source: 'foo/bar', + import_data_attributes: { + data: { + 'project_key' => 'key', + 'repo_slug' => 'slug' + }, + credentials: { 'base_uri' => 'http://bitbucket.org/', 'user' => 'bitbucket', 'password' => 'password' } + } + ) + end - describe '#calculate_job_delay' do - let(:importer_class) do - Class.new do - include Gitlab::BitbucketImport::ParallelScheduling + let(:importer_class) do + Class.new do + include Gitlab::BitbucketImport::ParallelScheduling - def collection_method - :issues - end + def collection_method + :issues end end + end + describe '#calculate_job_delay' do let(:importer) { importer_class.new(project) } before do @@ -34,4 +44,120 @@ def collection_method expect(importer.send(:calculate_job_delay, 100)).to eq(50.minutes) end end + + describe '#each_object_to_import' do + let_it_be(:opened_issue) { Bitbucket::Representation::Issue.new({ 'id' => 1, 'state' => 'OPENED' }) } + let_it_be(:object) { opened_issue.to_hash } + + let(:importer) { importer_class.new(project) } + + context 'without representation_type' do + it 'raises NotImplementedError' do + expect { importer_class.new(project).each_object_to_import }.to raise_error(NotImplementedError) + end + end + + context 'with representation_type' do + before do + allow(importer) + .to receive(:representation_type) + .and_return(:issue) + end + + it 'yields every object to import' do + page = instance_double('Bitbucket::Page', attrs: [], items: [opened_issue]) + allow(page).to receive(:next?).and_return(true) + allow(page).to receive(:next).and_return('https://example.com/next') + + allow_next_instance_of(Bitbucket::Client) do |client| + expect(client) + .to receive(:each_page) + .with(:issues, :issue, 'foo/bar', { next_url: nil }) + .and_yield(page) + end + + expect(importer.page_keyset) + .to receive(:set) + .with('https://example.com/next') + .and_return(true) + + expect(importer) + .to receive(:already_enqueued?) + .with(object) + .and_return(false) + + expect(importer) + .to receive(:mark_as_enqueued) + .with(object) + + expect { |b| importer.each_object_to_import(&b) } + .to yield_with_args(object) + end + + it 'resumes from the last page' do + page = instance_double('Bitbucket::Page', attrs: [], items: [opened_issue]) + allow(page).to receive(:next?).and_return(true) + allow(page).to receive(:next).and_return('https://example.com/next2') + + expect(importer.page_keyset) + .to receive(:current) + .and_return('https://example.com/next') + + allow_next_instance_of(Bitbucket::Client) do |client| + expect(client) + .to receive(:each_page) + .with(:issues, :issue, 'foo/bar', { + next_url: 'https://example.com/next' + }) + .and_yield(page) + end + + expect(importer.page_keyset) + .to receive(:set) + .with('https://example.com/next2') + .and_return(true) + + expect(importer) + .to receive(:already_enqueued?) + .with(object) + .and_return(false) + + expect(importer) + .to receive(:mark_as_enqueued) + .with(object) + + expect { |b| importer.each_object_to_import(&b) } + .to yield_with_args(object) + end + + it 'does not yield the object if it was already imported' do + page = instance_double('Bitbucket::Page', attrs: [], items: [opened_issue]) + allow(page).to receive(:next?).and_return(true) + allow(page).to receive(:next).and_return('https://example.com/next') + + allow_next_instance_of(Bitbucket::Client) do |client| + expect(client) + .to receive(:each_page) + .with(:issues, :issue, 'foo/bar', { next_url: nil }) + .and_yield(page) + end + + expect(importer.page_keyset) + .to receive(:set) + .with('https://example.com/next') + .and_return(true) + + expect(importer) + .to receive(:already_enqueued?) + .with(object) + .and_return(true) + + expect(importer) + .not_to receive(:mark_as_enqueued) + + expect { |b| importer.each_object_to_import(&b) } + .not_to yield_control + end + end + end end diff --git a/spec/lib/gitlab/import/page_keyset_spec.rb b/spec/lib/gitlab/import/page_keyset_spec.rb new file mode 100644 index 0000000000000000000000000000000000000000..73144bda88d0b20366f92eeed553dd431f1d64a1 --- /dev/null +++ b/spec/lib/gitlab/import/page_keyset_spec.rb @@ -0,0 +1,38 @@ +# frozen_string_literal: true + +require 'spec_helper' + +RSpec.describe Gitlab::Import::PageKeyset, :clean_gitlab_redis_shared_state, feature_category: :importers do + let(:project) { instance_double(Project, id: 1) } + let(:keyset) { described_class.new(project, :issues, 'bitbucket-import') } + + describe '#initialize' do + it 'sets the initial next url to be nil when no value is cached' do + expect(keyset.current).to eq(nil) + end + + it 'sets the initial next url to the cached value when one is present' do + Gitlab::Cache::Import::Caching.write(keyset.cache_key, 'https://example.com/nextpresent') + + expect(described_class.new(project, :issues, 'bitbucket-import').current).to eq('https://example.com/nextpresent') + end + end + + describe '#set' do + it 'sets the next url' do + keyset.set('https://example.com/next') + expect(keyset.current).to eq('https://example.com/next') + end + end + + describe '#expire!' do + it 'expires the current next url' do + keyset.set('https://example.com/next') + + keyset.expire! + + expect(Gitlab::Cache::Import::Caching.read(keyset.cache_key)).to be_nil + expect(keyset.current).to eq(nil) + end + end +end