From e8c77a6ae4721c1c1f01746af3b411593ec726a7 Mon Sep 17 00:00:00 2001 From: Rodrigo Tomonari Date: Mon, 22 Jan 2024 22:39:02 -0300 Subject: [PATCH] POC - Execute GitHub Import stages in parallel --- app/models/note.rb | 2 + app/services/import/github_service.rb | 3 +- .../stage/import_base_data_worker.rb | 6 +- .../stage/import_collaborators_worker.rb | 10 +- .../stage/import_issue_attachments_worker.rb | 41 +++++++ .../stage/import_issue_events_worker.rb | 12 +- .../import_issue_note_attachments_worker.rb | 41 +++++++ .../stage/import_issues_worker.rb | 25 ++++ .../stage/import_lfs_objects_worker.rb | 10 +- .../stage/import_protected_branches_worker.rb | 18 ++- .../import_pull_request_attachments_worker.rb | 41 +++++++ .../import_pull_request_diff_notes_worker.rb | 35 ++++++ .../import_pull_request_events_worker.rb | 35 ++++++ ...rt_pull_request_note_attachments_worker.rb | 41 +++++++ .../stage/import_pull_requests_worker.rb | 14 ++- .../import_release_attachments_worker.rb | 41 +++++++ .../stage/import_repository_worker.rb | 7 +- .../stage_orchestrator_worker.rb | 107 ++++++++++++++++++ .../github_import/wait_for_workers_worker.rb | 22 ++++ .../github_import_parallel_stages.yml | 8 ++ .../attachments/issue_notes_importer.rb | 27 +++++ .../pull_request_notes_importer.rb | 28 +++++ .../single_endpoint_issue_events_importer.rb | 10 +- ...e_endpoint_pull_request_events_importer.rb | 17 +++ lib/gitlab/github_import/settings.rb | 7 +- lib/gitlab/github_import/stages_management.rb | 46 ++++++++ .../stage_orchestrator_worker_spec.rb | 85 ++++++++++++++ 27 files changed, 714 insertions(+), 25 deletions(-) create mode 100644 app/workers/gitlab/github_import/stage/import_issue_attachments_worker.rb create mode 100644 app/workers/gitlab/github_import/stage/import_issue_note_attachments_worker.rb create mode 100644 app/workers/gitlab/github_import/stage/import_issues_worker.rb create mode 100644 app/workers/gitlab/github_import/stage/import_pull_request_attachments_worker.rb create mode 100644 app/workers/gitlab/github_import/stage/import_pull_request_diff_notes_worker.rb create mode 100644 app/workers/gitlab/github_import/stage/import_pull_request_events_worker.rb create mode 100644 app/workers/gitlab/github_import/stage/import_pull_request_note_attachments_worker.rb create mode 100644 app/workers/gitlab/github_import/stage/import_release_attachments_worker.rb create mode 100644 app/workers/gitlab/github_import/stage_orchestrator_worker.rb create mode 100644 app/workers/gitlab/github_import/wait_for_workers_worker.rb create mode 100644 config/feature_flags/development/github_import_parallel_stages.yml create mode 100644 lib/gitlab/github_import/importer/attachments/issue_notes_importer.rb create mode 100644 lib/gitlab/github_import/importer/attachments/pull_request_notes_importer.rb create mode 100644 lib/gitlab/github_import/importer/single_endpoint_pull_request_events_importer.rb create mode 100644 lib/gitlab/github_import/stages_management.rb create mode 100644 spec/workers/gitlab/github_import/stage_orchestrator_worker_spec.rb diff --git a/app/models/note.rb b/app/models/note.rb index 953632ba9109cf..e8d7893ec01882 100644 --- a/app/models/note.rb +++ b/app/models/note.rb @@ -134,6 +134,8 @@ class Note < ApplicationRecord # Scopes scope :for_commit_id, ->(commit_id) { where(noteable_type: "Commit", commit_id: commit_id) } + scope :for_merge_requests, -> { where(noteable_type: "MergeRequest") } + scope :for_issues, -> { where(noteable_type: "Issue") } scope :system, -> { where(system: true) } scope :user, -> { where(system: false) } scope :not_internal, -> { where(internal: false) } diff --git a/app/services/import/github_service.rb b/app/services/import/github_service.rb index 92a91740304ad7..557a3a65af8348 100644 --- a/app/services/import/github_service.rb +++ b/app/services/import/github_service.rb @@ -167,7 +167,8 @@ def store_import_settings(project) .write( timeout_strategy: params[:timeout_strategy] || ProjectImportData::PESSIMISTIC_TIMEOUT, optional_stages: params[:optional_stages], - extended_events: Feature.enabled?(:github_import_extended_events, current_user) + extended_events: Feature.enabled?(:github_import_extended_events, current_user), + parallel_stages: Feature.enabled?(:github_import_parallel_stages, current_user) ) end end diff --git a/app/workers/gitlab/github_import/stage/import_base_data_worker.rb b/app/workers/gitlab/github_import/stage/import_base_data_worker.rb index d965c1ae847064..703bd1d03b3835 100644 --- a/app/workers/gitlab/github_import/stage/import_base_data_worker.rb +++ b/app/workers/gitlab/github_import/stage/import_base_data_worker.rb @@ -26,7 +26,11 @@ def import(client, project) klass.new(project, client).execute end - ImportPullRequestsWorker.perform_async(project.id) + if import_settings(project).parallel_stages? + Gitlab::GithubImport::StagesManagement.new(project).finish('base_data') + else + ImportPullRequestsWorker.perform_async(project.id) + end end end end diff --git a/app/workers/gitlab/github_import/stage/import_collaborators_worker.rb b/app/workers/gitlab/github_import/stage/import_collaborators_worker.rb index 38e1fd52889c7b..539af0a5423bb9 100644 --- a/app/workers/gitlab/github_import/stage/import_collaborators_worker.rb +++ b/app/workers/gitlab/github_import/stage/import_collaborators_worker.rb @@ -41,9 +41,13 @@ def skip_to_next_stage(project) end def move_to_next_stage(project, waiters = {}) - AdvanceStageWorker.perform_async( - project.id, waiters.deep_stringify_keys, next_stage(project) - ) + if import_settings(project).parallel_stages? + WaitForWorkersWorker.perform_async(project.id, waiters.deep_stringify_keys, 'collaborators') + else + AdvanceStageWorker.perform_async( + project.id, waiters.deep_stringify_keys, next_stage(project) + ) + end end def next_stage(project) diff --git a/app/workers/gitlab/github_import/stage/import_issue_attachments_worker.rb b/app/workers/gitlab/github_import/stage/import_issue_attachments_worker.rb new file mode 100644 index 00000000000000..9671ddc96f936c --- /dev/null +++ b/app/workers/gitlab/github_import/stage/import_issue_attachments_worker.rb @@ -0,0 +1,41 @@ +# frozen_string_literal: true + +module Gitlab + module GithubImport + module Stage + class ImportIssueAttachmentsWorker # rubocop:disable Scalability/IdempotentWorker + include ApplicationWorker + + data_consistency :delayed + + include StageMethods + + resumes_work_when_interrupted! + + # client - An instance of Gitlab::GithubImport::Client. + # project - An instance of Project. + def import(client, project) + return skip_stage(project) if import_settings(project).disabled?(:attachments_import) + + waiter = importer.new(project, client).execute + + WaitForWorkersWorker.perform_async(project.id, { waiter.key => waiter.jobs_remaining }, stage_name) + end + + private + + def skip_stage(project) + StagesManagement.new(project).finish(stage_name) + end + + def importer + Importer::Attachments::IssuesImporter + end + + def stage_name + 'issue_attachments' + end + end + end + end +end diff --git a/app/workers/gitlab/github_import/stage/import_issue_events_worker.rb b/app/workers/gitlab/github_import/stage/import_issue_events_worker.rb index 9618500604ab78..d5041aaa559a10 100644 --- a/app/workers/gitlab/github_import/stage/import_issue_events_worker.rb +++ b/app/workers/gitlab/github_import/stage/import_issue_events_worker.rb @@ -19,7 +19,7 @@ def import(client, project) importer = ::Gitlab::GithubImport::Importer::SingleEndpointIssueEventsImporter info(project.id, message: "starting importer", importer: importer.name) - waiter = importer.new(project, client).execute + waiter = importer.new(project, client, import_pull_requests: import_pull_requests(project)).execute move_to_next_stage(project, { waiter.key => waiter.jobs_remaining }) end @@ -38,7 +38,15 @@ def skip_to_next_stage(project) end def move_to_next_stage(project, waiters = {}) - AdvanceStageWorker.perform_async(project.id, waiters.deep_stringify_keys, next_stage(project)) + if import_settings(project).parallel_stages? + WaitForWorkersWorker.perform_async(project.id, waiters.deep_stringify_keys, 'issue_events') + else + AdvanceStageWorker.perform_async(project.id, waiters.deep_stringify_keys, next_stage(project)) + end + end + + def import_pull_requests(project) + !import_settings(project).parallel_stages? end def next_stage(project) diff --git a/app/workers/gitlab/github_import/stage/import_issue_note_attachments_worker.rb b/app/workers/gitlab/github_import/stage/import_issue_note_attachments_worker.rb new file mode 100644 index 00000000000000..ede5cc8c3dffc0 --- /dev/null +++ b/app/workers/gitlab/github_import/stage/import_issue_note_attachments_worker.rb @@ -0,0 +1,41 @@ +# frozen_string_literal: true + +module Gitlab + module GithubImport + module Stage + class ImportIssueNoteAttachmentsWorker # rubocop:disable Scalability/IdempotentWorker + include ApplicationWorker + + data_consistency :delayed + + include StageMethods + + resumes_work_when_interrupted! + + # client - An instance of Gitlab::GithubImport::Client. + # project - An instance of Project. + def import(client, project) + return skip_stage(project) if import_settings(project).disabled?(:attachments_import) + + waiter = importer.new(project, client).execute + + WaitForWorkersWorker.perform_async(project.id, { waiter.key => waiter.jobs_remaining }, stage_name) + end + + private + + def skip_stage(project) + Gitlab::GithubImport::StagesManagement.new(project).finish(stage_name) + end + + def importer + Importer::Attachments::IssueNotesImporter + end + + def stage_name + 'issue_note_attachments' + end + end + end + end +end diff --git a/app/workers/gitlab/github_import/stage/import_issues_worker.rb b/app/workers/gitlab/github_import/stage/import_issues_worker.rb new file mode 100644 index 00000000000000..4e4019f4663946 --- /dev/null +++ b/app/workers/gitlab/github_import/stage/import_issues_worker.rb @@ -0,0 +1,25 @@ +# frozen_string_literal: true + +module Gitlab + module GithubImport + module Stage + class ImportIssuesWorker # rubocop:disable Scalability/IdempotentWorker + include ApplicationWorker + + data_consistency :delayed + + include StageMethods + + resumes_work_when_interrupted! + + # client - An instance of Gitlab::GithubImport::Client. + # project - An instance of Project. + def import(client, project) + waiter = Importer::IssuesImporter.new(project, client).execute + + WaitForWorkersWorker.perform_async(project.id, { waiter.key => waiter.jobs_remaining }, 'issues') + end + end + end + end +end diff --git a/app/workers/gitlab/github_import/stage/import_lfs_objects_worker.rb b/app/workers/gitlab/github_import/stage/import_lfs_objects_worker.rb index 3f57b958418633..4611f0601a84d6 100644 --- a/app/workers/gitlab/github_import/stage/import_lfs_objects_worker.rb +++ b/app/workers/gitlab/github_import/stage/import_lfs_objects_worker.rb @@ -21,11 +21,11 @@ def import(_client, project) .new(project, nil) .execute - AdvanceStageWorker.perform_async( - project.id, - { waiter.key => waiter.jobs_remaining }, - 'finish' - ) + if import_settings(project).parallel_stages? + WaitForWorkersWorker.perform_async(project.id, { waiter.key => waiter.jobs_remaining }, 'lfs_objects') + else + AdvanceStageWorker.perform_async(project.id, { waiter.key => waiter.jobs_remaining }, 'finish') + end end end end diff --git a/app/workers/gitlab/github_import/stage/import_protected_branches_worker.rb b/app/workers/gitlab/github_import/stage/import_protected_branches_worker.rb index 65b9d85f453dea..1191c3c06d367b 100644 --- a/app/workers/gitlab/github_import/stage/import_protected_branches_worker.rb +++ b/app/workers/gitlab/github_import/stage/import_protected_branches_worker.rb @@ -18,11 +18,19 @@ def import(client, project) .new(project, client) .execute - AdvanceStageWorker.perform_async( - project.id, - { waiter.key => waiter.jobs_remaining }, - 'lfs_objects' - ) + if import_settings(project).parallel_stages? + WaitForWorkersWorker.perform_async( + project.id, + { waiter.key => waiter.jobs_remaining }, + 'protected_branches' + ) + else + AdvanceStageWorker.perform_async( + project.id, + { waiter.key => waiter.jobs_remaining }, + 'lfs_objects' + ) + end end end end diff --git a/app/workers/gitlab/github_import/stage/import_pull_request_attachments_worker.rb b/app/workers/gitlab/github_import/stage/import_pull_request_attachments_worker.rb new file mode 100644 index 00000000000000..f95afd5e6d8ec9 --- /dev/null +++ b/app/workers/gitlab/github_import/stage/import_pull_request_attachments_worker.rb @@ -0,0 +1,41 @@ +# frozen_string_literal: true + +module Gitlab + module GithubImport + module Stage + class ImportPullRequestAttachmentsWorker # rubocop:disable Scalability/IdempotentWorker + include ApplicationWorker + + data_consistency :delayed + + include StageMethods + + resumes_work_when_interrupted! + + # client - An instance of Gitlab::GithubImport::Client. + # project - An instance of Project. + def import(client, project) + return skip_stage(project) if import_settings(project).disabled?(:attachments_import) + + waiter = importer.new(project, client).execute + + WaitForWorkersWorker.perform_async(project.id, { waiter.key => waiter.jobs_remaining }, stage_name) + end + + private + + def skip_stage(project) + StagesManagement.new(project).finish(stage_name) + end + + def importer + Importer::Attachments::MergeRequestsImporter + end + + def stage_name + 'pull_request_attachments' + end + end + end + end +end diff --git a/app/workers/gitlab/github_import/stage/import_pull_request_diff_notes_worker.rb b/app/workers/gitlab/github_import/stage/import_pull_request_diff_notes_worker.rb new file mode 100644 index 00000000000000..772e801b2a02e6 --- /dev/null +++ b/app/workers/gitlab/github_import/stage/import_pull_request_diff_notes_worker.rb @@ -0,0 +1,35 @@ +# frozen_string_literal: true + +module Gitlab + module GithubImport + module Stage + class ImportPullRequestDiffNotesWorker # rubocop:disable Scalability/IdempotentWorker + include ApplicationWorker + + data_consistency :delayed + + include StageMethods + + resumes_work_when_interrupted! + + # client - An instance of Gitlab::GithubImport::Client. + # project - An instance of Project. + def import(client, project) + waiter = importer.new(project, client).execute + + WaitForWorkersWorker.perform_async(project.id, { waiter.key => waiter.jobs_remaining }, stage_name) + end + + private + + def importer + Importer::SingleEndpointDiffNotesImporter + end + + def stage_name + 'pull_request_diff_notes' + end + end + end + end +end diff --git a/app/workers/gitlab/github_import/stage/import_pull_request_events_worker.rb b/app/workers/gitlab/github_import/stage/import_pull_request_events_worker.rb new file mode 100644 index 00000000000000..a4763e08bf4c4e --- /dev/null +++ b/app/workers/gitlab/github_import/stage/import_pull_request_events_worker.rb @@ -0,0 +1,35 @@ +# frozen_string_literal: true + +module Gitlab + module GithubImport + module Stage + class ImportPullRequestEventsWorker # rubocop:disable Scalability/IdempotentWorker + include ApplicationWorker + + data_consistency :delayed + + include StageMethods + + resumes_work_when_interrupted! + + # client - An instance of Gitlab::GithubImport::Client. + # project - An instance of Project. + def import(client, project) + waiter = importer.new(project, client, import_issues: false).execute + + WaitForWorkersWorker.perform_async(project.id, { waiter.key => waiter.jobs_remaining }, stage_name) + end + + private + + def importer + Importer::SingleEndpointPullRequestEventsImporter + end + + def stage_name + 'pull_request_events' + end + end + end + end +end diff --git a/app/workers/gitlab/github_import/stage/import_pull_request_note_attachments_worker.rb b/app/workers/gitlab/github_import/stage/import_pull_request_note_attachments_worker.rb new file mode 100644 index 00000000000000..30ba32140e5e36 --- /dev/null +++ b/app/workers/gitlab/github_import/stage/import_pull_request_note_attachments_worker.rb @@ -0,0 +1,41 @@ +# frozen_string_literal: true + +module Gitlab + module GithubImport + module Stage + class ImportPullRequestNoteAttachmentsWorker # rubocop:disable Scalability/IdempotentWorker + include ApplicationWorker + + data_consistency :delayed + + include StageMethods + + resumes_work_when_interrupted! + + # client - An instance of Gitlab::GithubImport::Client. + # project - An instance of Project. + def import(client, project) + return skip_stage(project) if import_settings(project).disabled?(:attachments_import) + + waiter = importer.new(project, client).execute + + WaitForWorkersWorker.perform_async(project.id, { waiter.key => waiter.jobs_remaining }, stage_name) + end + + private + + def skip_stage(project) + Gitlab::GithubImport::StagesManagement.new(project).finish(stage_name) + end + + def importer + Importer::Attachments::PullRequestNotesImporter + end + + def stage_name + 'pull_request_note_attachments' + end + end + end + end +end diff --git a/app/workers/gitlab/github_import/stage/import_pull_requests_worker.rb b/app/workers/gitlab/github_import/stage/import_pull_requests_worker.rb index bcc39b169afeb8..cd243ea50a2707 100644 --- a/app/workers/gitlab/github_import/stage/import_pull_requests_worker.rb +++ b/app/workers/gitlab/github_import/stage/import_pull_requests_worker.rb @@ -26,11 +26,15 @@ def import(client, project) .new(project, client) .execute - AdvanceStageWorker.perform_async( - project.id, - { waiter.key => waiter.jobs_remaining }, - 'collaborators' - ) + if import_settings(project).parallel_stages? + WaitForWorkersWorker.perform_async(project.id, { waiter.key => waiter.jobs_remaining }, 'pull_requests') + else + AdvanceStageWorker.perform_async( + project.id, + { waiter.key => waiter.jobs_remaining }, + 'collaborators' + ) + end end private diff --git a/app/workers/gitlab/github_import/stage/import_release_attachments_worker.rb b/app/workers/gitlab/github_import/stage/import_release_attachments_worker.rb new file mode 100644 index 00000000000000..bea3a612960f58 --- /dev/null +++ b/app/workers/gitlab/github_import/stage/import_release_attachments_worker.rb @@ -0,0 +1,41 @@ +# frozen_string_literal: true + +module Gitlab + module GithubImport + module Stage + class ImportReleaseAttachmentsWorker # rubocop:disable Scalability/IdempotentWorker + include ApplicationWorker + + data_consistency :delayed + + include StageMethods + + resumes_work_when_interrupted! + + # client - An instance of Gitlab::GithubImport::Client. + # project - An instance of Project. + def import(client, project) + return skip_stage(project) if import_settings(project).disabled?(:attachments_import) + + waiter = importer.new(project, client).execute + + WaitForWorkersWorker.perform_async(project.id, { waiter.key => waiter.jobs_remaining }, stage_name) + end + + private + + def skip_stage(project) + Gitlab::GithubImport::StagesManagement.new(project).finish(stage_name) + end + + def importer + Importer::Attachments::ReleasesImporter + end + + def stage_name + 'release_attachments' + end + end + end + end +end diff --git a/app/workers/gitlab/github_import/stage/import_repository_worker.rb b/app/workers/gitlab/github_import/stage/import_repository_worker.rb index 44481b8a75ce5a..8d1f8b993a138b 100644 --- a/app/workers/gitlab/github_import/stage/import_repository_worker.rb +++ b/app/workers/gitlab/github_import/stage/import_repository_worker.rb @@ -25,7 +25,12 @@ def import(client, project) counter.increment - ImportBaseDataWorker.perform_async(project.id) + if import_settings(project).parallel_stages? + StagesManagement.new(project).finish('repository') + StageOrchestratorWorker.perform_async(project.id) + else + ImportBaseDataWorker.perform_async(project.id) + end end def counter diff --git a/app/workers/gitlab/github_import/stage_orchestrator_worker.rb b/app/workers/gitlab/github_import/stage_orchestrator_worker.rb new file mode 100644 index 00000000000000..785e92e79e6f3d --- /dev/null +++ b/app/workers/gitlab/github_import/stage_orchestrator_worker.rb @@ -0,0 +1,107 @@ +# frozen_string_literal: true + +module Gitlab + module GithubImport + class StageOrchestratorWorker + include ApplicationWorker + include GithubImport::Queue + + data_consistency :delayed + sidekiq_options retry: 6 + idempotent! + + INTERVAL = 30.seconds + + STAGE_EXECUTION_TREE = { + repository: { + collaborators: {}, + lfs_objects: {}, + protected_branches: {}, + base_data: { + pull_requests: { + pull_request_attachments: {}, + pull_request_diff_notes: {}, + pull_request_events: { + pull_request_note_attachments: {} + } + }, + issues: { + issue_attachments: {}, + issue_events: { + issue_note_attachments: {} + } + } + } + } + }.freeze + + STAGES = { + repository: Stage::ImportRepositoryWorker, + collaborators: Stage::ImportCollaboratorsWorker, + lfs_objects: Stage::ImportLfsObjectsWorker, + protected_branches: Stage::ImportProtectedBranchesWorker, + base_data: Stage::ImportBaseDataWorker, + pull_requests: Stage::ImportPullRequestsWorker, + pull_request_attachments: Stage::ImportPullRequestAttachmentsWorker, + pull_request_diff_notes: Stage::ImportPullRequestDiffNotesWorker, + pull_request_events: Stage::ImportPullRequestEventsWorker, + pull_request_note_attachments: Stage::ImportPullRequestNoteAttachmentsWorker, + issues: Stage::ImportIssuesWorker, + issue_attachments: Stage::ImportIssueAttachmentsWorker, + issue_events: Stage::ImportIssueEventsWorker, + issue_note_attachments: Stage::ImportIssueNoteAttachmentsWorker + }.freeze + + def perform(project_id) + @project = Project.find_by_id(project_id) + + return unless @project + + if @project.import_state&.completed? + info( + project_id, + message: 'Project import is no longer running. Stopping worker.', + import_status: @project.import_state.status + ) + + return + end + + finished = compute(:repository, STAGE_EXECUTION_TREE) + + if finished + Stage::FinishImportWorker.perform_async(@project.id) + else + self.class.perform_in(INTERVAL, @project.id) + end + end + + private + + def start_stage(stage) + stages_management.start(stage) + STAGES[stage].perform_async(@project.id) + end + + def compute(stage, tree) + if stages_management.finished?(stage) + children_status = tree.map do |stage, children| + compute(stage, children) + end + + return children_status.all? + end + + return false if stages_management.running?(stage) + + start_stage(stage) + + false + end + + def stages_management + @stages_management ||= StagesManagement.new(@project) + end + end + end +end diff --git a/app/workers/gitlab/github_import/wait_for_workers_worker.rb b/app/workers/gitlab/github_import/wait_for_workers_worker.rb new file mode 100644 index 00000000000000..737afd8a4525ab --- /dev/null +++ b/app/workers/gitlab/github_import/wait_for_workers_worker.rb @@ -0,0 +1,22 @@ +# frozen_string_literal: true + +module Gitlab + module GithubImport + # This worker is used by the GitHub importer to wait for a + # number of jobs to complete, without blocking a thread. Once all jobs have + # been completed this worker will mark the stage as finished. + class WaitForWorkersWorker < AdvanceStageWorker # rubocop:disable Scalability/IdempotentWorker + loggable_arguments 1, 2, 3, 4 + + private + + def proceed_to_next_stage(import_state_jid, stage, project_id) + import_state_jid.refresh_jid_expiration + + project = Project.find_by_id(project_id) + + StagesManagement.new(project).finish(stage) + end + end + end +end diff --git a/config/feature_flags/development/github_import_parallel_stages.yml b/config/feature_flags/development/github_import_parallel_stages.yml new file mode 100644 index 00000000000000..dd03156e7efa0d --- /dev/null +++ b/config/feature_flags/development/github_import_parallel_stages.yml @@ -0,0 +1,8 @@ +--- +name: github_import_parallel_stages +introduced_by_url: +rollout_issue_url: +milestone: '16.8' +type: development +group: group::import and integrate +default_enabled: false diff --git a/lib/gitlab/github_import/importer/attachments/issue_notes_importer.rb b/lib/gitlab/github_import/importer/attachments/issue_notes_importer.rb new file mode 100644 index 00000000000000..13bdb989c910ee --- /dev/null +++ b/lib/gitlab/github_import/importer/attachments/issue_notes_importer.rb @@ -0,0 +1,27 @@ +# frozen_string_literal: true + +module Gitlab + module GithubImport + module Importer + module Attachments + class IssueNotesImporter < NotesImporter + def initialize(*args, **kwargs) + super(*args, **kwargs) + + @already_imported_cache_key.concat('/issues') + @job_waiter_cache_key.concat('/issues') + @job_waiter_remaining_cache_key.concat('/issues') + end + + private + + # TODO: exclude :system, :noteable_type from select after removing override Note#note method + # https://gitlab.com/gitlab-org/gitlab/-/issues/369923 + def collection + project.notes.for_issues.id_not_in(already_imported_ids).user.select(:id, :note, :system, :noteable_type) + end + end + end + end + end +end diff --git a/lib/gitlab/github_import/importer/attachments/pull_request_notes_importer.rb b/lib/gitlab/github_import/importer/attachments/pull_request_notes_importer.rb new file mode 100644 index 00000000000000..22b7d4a5174283 --- /dev/null +++ b/lib/gitlab/github_import/importer/attachments/pull_request_notes_importer.rb @@ -0,0 +1,28 @@ +# frozen_string_literal: true + +module Gitlab + module GithubImport + module Importer + module Attachments + class PullRequestNotesImporter < NotesImporter + def initialize(*args, **kwargs) + super(*args, **kwargs) + + @already_imported_cache_key.concat('/pull-requests') + @job_waiter_cache_key.concat('/pull-requests') + @job_waiter_remaining_cache_key.concat('/pull-requests') + end + + private + + # TODO: exclude :system, :noteable_type from select after removing override Note#note method + # https://gitlab.com/gitlab-org/gitlab/-/issues/369923 + def collection + project.notes.for_merge_requests.id_not_in(already_imported_ids).user + .select(:id, :note, :system, :noteable_type) + end + end + end + end + end +end diff --git a/lib/gitlab/github_import/importer/single_endpoint_issue_events_importer.rb b/lib/gitlab/github_import/importer/single_endpoint_issue_events_importer.rb index 126a0b8fa4ad07..d9873f2a15970d 100644 --- a/lib/gitlab/github_import/importer/single_endpoint_issue_events_importer.rb +++ b/lib/gitlab/github_import/importer/single_endpoint_issue_events_importer.rb @@ -10,7 +10,9 @@ class SingleEndpointIssueEventsImporter PROCESSED_PAGE_CACHE_KEY = 'issues/%{issuable_iid}/%{collection}' BATCH_SIZE = 100 - def initialize(project, client, parallel: true) + attr_reader :import_issues, :import_pull_requests + + def initialize(project, client, parallel: true, import_issues: true, import_pull_requests: true) @project = project @client = client @parallel = parallel @@ -20,6 +22,8 @@ def initialize(project, client, parallel: true) { project: project.id, collection: collection_method } @job_waiter_remaining_cache_key = JOB_WAITER_REMAINING_CACHE_KEY % { project: project.id, collection: collection_method } + @import_issues = import_issues + @import_pull_requests = import_pull_requests end # In single endpoint there is no issue info to which associated related @@ -77,10 +81,14 @@ def collection_method end def issues_collection + return Issue.none unless import_issues + project.issues.where.not(iid: already_imported_parents).select(:id, :iid) # rubocop: disable CodeReuse/ActiveRecord end def merge_requests_collection + return MergeRequest.none unless import_pull_requests + project.merge_requests.where.not(iid: already_imported_parents).select(:id, :iid) # rubocop: disable CodeReuse/ActiveRecord end diff --git a/lib/gitlab/github_import/importer/single_endpoint_pull_request_events_importer.rb b/lib/gitlab/github_import/importer/single_endpoint_pull_request_events_importer.rb new file mode 100644 index 00000000000000..8ea49605248746 --- /dev/null +++ b/lib/gitlab/github_import/importer/single_endpoint_pull_request_events_importer.rb @@ -0,0 +1,17 @@ +# frozen_string_literal: true + +module Gitlab + module GithubImport + module Importer + class SingleEndpointPullRequestEventsImporter < SingleEndpointIssueEventsImporter + def initialize(*args, **kwargs) + super(*args, **kwargs) + + @already_imported_cache_key.concat('/pull-requests') + @job_waiter_cache_key.concat('/pull-requests') + @job_waiter_remaining_cache_key.concat('/pull-requests') + end + end + end + end +end diff --git a/lib/gitlab/github_import/settings.rb b/lib/gitlab/github_import/settings.rb index da5833df3a1d13..ce3f5b32bdb5c8 100644 --- a/lib/gitlab/github_import/settings.rb +++ b/lib/gitlab/github_import/settings.rb @@ -67,7 +67,8 @@ def write(user_settings) data: { optional_stages: optional_stages, timeout_strategy: user_settings[:timeout_strategy], - extended_events: user_settings[:extended_events] + extended_events: user_settings[:extended_events], + parallel_stages: user_settings[:parallel_stages] }, credentials: project.import_data&.credentials ) @@ -87,6 +88,10 @@ def extended_events? !!project.import_data&.data&.dig('extended_events') end + def parallel_stages? + !!project.import_data&.data&.dig('parallel_stages') + end + private attr_reader :project diff --git a/lib/gitlab/github_import/stages_management.rb b/lib/gitlab/github_import/stages_management.rb new file mode 100644 index 00000000000000..f8d7820f88bae1 --- /dev/null +++ b/lib/gitlab/github_import/stages_management.rb @@ -0,0 +1,46 @@ +# frozen_string_literal: true + +module Gitlab + module GithubImport + class StagesManagement + STARTED = 'started' + FINISHED = 'finished' + + def initialize(project) + @project = project + end + + def start(stage) + ::Gitlab::Cache::Import::Caching.hash_add(stages_cache_key, stage, STARTED) + end + + def finish(stage) + ::Gitlab::Cache::Import::Caching.hash_add(stages_cache_key, stage, FINISHED) + end + + def running?(stage) + status(stage) == STARTED + end + + def finished?(stage) + status(stage) == FINISHED + end + + def status(stage) + statuses[stage.to_sym] + end + + def statuses + ::Gitlab::Cache::Import::Caching.values_from_hash(stages_cache_key).symbolize_keys + end + + private + + attr_reader :project + + def stages_cache_key + "github-importer/stages/#{project.id}" + end + end + end +end diff --git a/spec/workers/gitlab/github_import/stage_orchestrator_worker_spec.rb b/spec/workers/gitlab/github_import/stage_orchestrator_worker_spec.rb new file mode 100644 index 00000000000000..a115132238a24d --- /dev/null +++ b/spec/workers/gitlab/github_import/stage_orchestrator_worker_spec.rb @@ -0,0 +1,85 @@ +# frozen_string_literal: true + +require 'spec_helper' + +RSpec.describe Gitlab::GithubImport::StageOrchestratorWorker, :clean_gitlab_redis_cache, feature_category: :importers do + let_it_be(:project) { create(:project, import_state: create(:import_state, :started)) } + + let(:stage_management) { Gitlab::GithubImport::StagesManagement.new(project) } + + subject(:worker) { described_class.new } + + describe '#perform' do + context "when repository stage has not started" do + it "starts the repository stage and do not start childreen stages", :aggregate_failures do + expect(Gitlab::GithubImport::Stage::ImportRepositoryWorker).to receive(:perform_async).with(project.id) + expect(described_class).to receive(:perform_in).with(described_class::INTERVAL, project.id) + + described_class::STAGE_EXECUTION_TREE[:repository].each_key do |child| + expect(described_class::STAGES[child]).not_to receive(:perform_async) + end + + expect { worker.perform(project.id) } + .to change { stage_management.running?(:repository) } + .from(false) + .to(true) + end + end + + context "when repository stage is running and the childreen stages have not" do + before do + stage_management.start(:repository) + end + + it "does not start any stages", :aggregate_failures do + described_class::STAGES.each_value do |stage_class| + expect(stage_class).not_to receive(:perform_async) + end + + expect(described_class).to receive(:perform_in).with(described_class::INTERVAL, project.id) + + worker.perform(project.id) + end + end + + context "when repository stage has finished and the childreen stages have not started" do + before do + stage_management.finish(:repository) + end + + it "starts the repository chieldreen stages", :aggregate_failures do + described_class::STAGE_EXECUTION_TREE[:repository].each_key do |child| + expect(described_class::STAGES[child]).to receive(:perform_async).with(project.id) + + expect(stage_management.running?(child)).to eq(false) + end + + expect(described_class).to receive(:perform_in).with(described_class::INTERVAL, project.id) + + worker.perform(project.id) + + described_class::STAGE_EXECUTION_TREE[:repository].each_key do |child| + expect(stage_management.running?(child)).to eq(true) + end + end + end + + context "when all stages finished" do + before do + described_class::STAGES.each_key do |stage| + stage_management.finish(stage) + end + end + + it "does not start any stages and execute finish stage", :aggregate_failures do + described_class::STAGES.each_value do |stage_class| + expect(stage_class).not_to receive(:perform_async) + end + + expect(Gitlab::GithubImport::Stage::FinishImportWorker).to receive(:perform_async).with(project.id) + + worker.perform(project.id) + end + end + end +end -- GitLab