From 0453a29c5737aa47227da74a48b80afeafbbe7cd Mon Sep 17 00:00:00 2001 From: Marc Shaw Date: Fri, 22 Aug 2025 12:50:06 +0200 Subject: [PATCH] Split notification logic from MergeRequests::RefreshService This commit extracts notification-related functionality from the main refresh service into a separate service and worker to improve performance and separation of concerns. Key changes: - Add MergeRequests::Refresh::NotificationService to handle: * Outdating suggestions on merge requests * Marking pending todos as done * Adding system notes for branch presence changes * Tracking merge request activity metrics * Processing commit-related notifications - Add MergeRequests::Refresh::NotificationWorker for async processing - Introduce split_refresh_worker_notification feature flag for gradual rollout - Move time-consuming notification operations to background processing - Maintain backward compatibility when feature flag is disabled This refactoring reduces blocking time in the main refresh flow and allows for better scalability of merge request operations. Part of: https://gitlab.com/gitlab-org/gitlab/-/issues/554081 MR: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/202370 --- .../refresh/notification_service.rb | 105 ++++++++++ .../merge_requests/refresh_service.rb | 32 ++- app/workers/all_queues.yml | 10 + .../refresh/notification_worker.rb | 33 +++ .../wip/split_refresh_worker_notification.yml | 10 + config/sidekiq_queues.yml | 2 + .../refresh/notification_service_spec.rb | 196 ++++++++++++++++++ .../merge_requests/refresh_service_spec.rb | 77 +++++++ .../refresh/notification_worker_spec.rb | 137 ++++++++++++ 9 files changed, 597 insertions(+), 5 deletions(-) create mode 100644 app/services/merge_requests/refresh/notification_service.rb create mode 100644 app/workers/merge_requests/refresh/notification_worker.rb create mode 100644 config/feature_flags/wip/split_refresh_worker_notification.yml create mode 100644 spec/services/merge_requests/refresh/notification_service_spec.rb create mode 100644 spec/workers/merge_requests/refresh/notification_worker_spec.rb diff --git a/app/services/merge_requests/refresh/notification_service.rb b/app/services/merge_requests/refresh/notification_service.rb new file mode 100644 index 00000000000000..f8858d5e736e39 --- /dev/null +++ b/app/services/merge_requests/refresh/notification_service.rb @@ -0,0 +1,105 @@ +# frozen_string_literal: true + +module MergeRequests + module Refresh + class NotificationService < MergeRequests::Refresh::BaseService + attr_reader :push + + def execute(oldrev, newrev, ref) + @push = Gitlab::Git::Push.new(@project, oldrev, newrev, ref) + + find_new_commits + notify + end + + private + + def notify + merge_requests_for_source_branch.each do |mr| + outdate_suggestions(mr) + mark_pending_todos_done(mr) + + # Leave a system note if a branch was deleted/added + comment_mr_branch_presence_changed(mr) if branch_added_or_removed? + + # Add comment about pushing new commits to merge requests and send notification emails + notify_about_push(mr) + + merge_request_activity_counter.track_mr_including_ci_config(user: mr.author, merge_request: mr) + end + end + + def outdate_suggestions(merge_request) + outdate_service.execute(merge_request) + end + + def outdate_service + @outdate_service ||= Suggestions::OutdateService.new + end + + def mark_pending_todos_done(merge_request) + todo_service.merge_request_push(merge_request, @current_user) + end + + def branch_added_or_removed? + @push.branch_added? || @push.branch_removed? + end + strong_memoize_attr :branch_added_or_removed? + + # Add comment about branches being deleted or added to merge requests + def comment_mr_branch_presence_changed(merge_request) + presence = @push.branch_added? ? :add : :delete + + SystemNoteService.change_branch_presence( + merge_request, merge_request.project, @current_user, + :source, @push.branch_name, presence) + end + + # Add comment about pushing new commits to merge requests and send notification emails + def notify_about_push(merge_request) + return unless @commits.present? + + mr_commit_ids = Set.new(merge_request.commit_shas) + + new_commits, existing_commits = @commits.partition do |commit| + mr_commit_ids.include?(commit.id) + end + + SystemNoteService.add_commits( + merge_request, merge_request.project, + @current_user, new_commits, + existing_commits, @push.oldrev + ) + + notification_service.push_to_merge_request(merge_request, @current_user, new_commits: new_commits, + existing_commits: existing_commits) + end + + def find_new_commits + if @push.branch_added? + @commits = [] + + merge_request = merge_requests_for_source_branch.first + return unless merge_request + + begin + # Since any number of commits could have been made to the restored branch, + # find the common root to see what has been added. + common_ref = @project.repository.merge_base(merge_request.diff_head_sha, @push.newrev) + # If the a commit no longer exists in this repo, gitlab_git throws + # a Rugged::OdbError. This is fixed in https://gitlab.com/gitlab-org/gitlab_git/merge_requests/52 + @commits = @project.repository.commits_between(common_ref, @push.newrev) if common_ref + rescue StandardError + end + elsif @push.branch_removed? + # No commits for a deleted branch. + @commits = [] + else + @commits = @project.repository.commits_between(@push.oldrev, @push.newrev) + end + end + end + end +end + +MergeRequests::Refresh::NotificationService.prepend_mod_with('MergeRequests::Refresh::NotificationService') diff --git a/app/services/merge_requests/refresh_service.rb b/app/services/merge_requests/refresh_service.rb index ff53fc3ca5c9a3..f793c207cc5a47 100644 --- a/app/services/merge_requests/refresh_service.rb +++ b/app/services/merge_requests/refresh_service.rb @@ -29,10 +29,15 @@ def refresh_merge_requests! reload_merge_requests + if Feature.disabled?(:split_refresh_worker_notification, @current_user) + merge_requests_for_source_branch.each do |mr| + outdate_suggestions(mr) + mark_pending_todos_done(mr) + end + end + merge_requests_for_source_branch.each do |mr| - outdate_suggestions(mr) abort_auto_merges(mr) - mark_pending_todos_done(mr) end abort_ff_merge_requests_with_auto_merges @@ -40,18 +45,35 @@ def refresh_merge_requests! merge_requests_for_source_branch.each do |mr| # Leave a system note if a branch was deleted/added - if branch_added_or_removed? + if Feature.disabled?(:split_refresh_worker_notification, @current_user) && branch_added_or_removed? comment_mr_branch_presence_changed(mr) end - notify_about_push(mr) + # Add comment about pushing new commits to merge requests and send notification emails + if Feature.disabled?(:split_refresh_worker_notification, @current_user) + notify_about_push(mr) + end + mark_mr_as_draft_from_commits(mr) execute_mr_web_hooks(mr) # Run at the end of the loop to avoid any potential contention on the MR object refresh_pipelines_on_merge_requests(mr) unless @push.branch_removed? - merge_request_activity_counter.track_mr_including_ci_config(user: mr.author, merge_request: mr) + + if Feature.disabled?(:split_refresh_worker_notification, @current_user) + merge_request_activity_counter.track_mr_including_ci_config(user: mr.author, merge_request: mr) + end + end + + if Feature.enabled?(:split_refresh_worker_notification, @current_user) + MergeRequests::Refresh::NotificationWorker.perform_async( + @project.id, + @current_user.id, + @push.oldrev, + @push.newrev, + @push.ref + ) end true diff --git a/app/workers/all_queues.yml b/app/workers/all_queues.yml index 6142cbdb996d67..be0a97bce63e41 100644 --- a/app/workers/all_queues.yml +++ b/app/workers/all_queues.yml @@ -4362,6 +4362,16 @@ :idempotent: true :tags: [] :queue_namespace: +- :name: merge_requests_refresh_notification + :worker_name: MergeRequests::Refresh::NotificationWorker + :feature_category: :code_review_workflow + :has_external_dependencies: false + :urgency: :low + :resource_boundary: :cpu + :weight: 2 + :idempotent: true + :tags: [] + :queue_namespace: - :name: merge_requests_resolve_todos :worker_name: MergeRequests::ResolveTodosWorker :feature_category: :code_review_workflow diff --git a/app/workers/merge_requests/refresh/notification_worker.rb b/app/workers/merge_requests/refresh/notification_worker.rb new file mode 100644 index 00000000000000..8e4cb4545a7379 --- /dev/null +++ b/app/workers/merge_requests/refresh/notification_worker.rb @@ -0,0 +1,33 @@ +# frozen_string_literal: true + +module MergeRequests + module Refresh + class NotificationWorker + include ApplicationWorker + + idempotent! + deduplicate :until_executed + + feature_category :code_review_workflow + urgency :low + worker_resource_boundary :cpu + weight 2 + data_consistency :sticky + + defer_on_database_health_signal :gitlab_main, [], 10.seconds + + # NOTE: This worker will be deprecated once we switch to using events + def perform(project_id, user_id, oldrev, newrev, ref) + project = Project.find_by_id(project_id) + return unless project + + user = User.find_by_id(user_id) + return unless user + + MergeRequests::Refresh::NotificationService + .new(project: project, current_user: user) + .execute(oldrev, newrev, ref) + end + end + end +end diff --git a/config/feature_flags/wip/split_refresh_worker_notification.yml b/config/feature_flags/wip/split_refresh_worker_notification.yml new file mode 100644 index 00000000000000..bd451a7060b49f --- /dev/null +++ b/config/feature_flags/wip/split_refresh_worker_notification.yml @@ -0,0 +1,10 @@ +--- +name: split_refresh_worker_notification +description: +feature_issue_url: https://gitlab.com/gitlab-org/gitlab/-/issues/554081 +introduced_by_url: +rollout_issue_url: +milestone: '18.4' +group: group::code review +type: wip +default_enabled: false diff --git a/config/sidekiq_queues.yml b/config/sidekiq_queues.yml index 5b79f954397035..429c408b648422 100644 --- a/config/sidekiq_queues.yml +++ b/config/sidekiq_queues.yml @@ -663,6 +663,8 @@ - 1 - - merge_requests_resolve_todos - 1 +- - merge_requests_refresh_notification + - 2 - - merge_requests_resolve_todos_after_approval - 1 - - merge_requests_stream_approval_audit_event diff --git a/spec/services/merge_requests/refresh/notification_service_spec.rb b/spec/services/merge_requests/refresh/notification_service_spec.rb new file mode 100644 index 00000000000000..aab8a6681d2a61 --- /dev/null +++ b/spec/services/merge_requests/refresh/notification_service_spec.rb @@ -0,0 +1,196 @@ +# frozen_string_literal: true + +require 'spec_helper' + +RSpec.describe MergeRequests::Refresh::NotificationService, feature_category: :code_review_workflow do + include ProjectForksHelper + include UserHelpers + + let(:project) { create(:project, :repository) } + let(:user) { create(:user) } + let(:service) { described_class.new(project: project, current_user: user) } + + describe '#execute' do + let(:oldrev) { TestEnv::BRANCH_SHA['improve/awesome'] } + let(:newrev) { TestEnv::BRANCH_SHA['master'] } + let(:ref) { 'refs/heads/master' } + + let!(:merge_request) do + create( + :merge_request, + source_project: project, + source_branch: 'master', + target_branch: 'feature', + target_project: project + ) + end + + let!(:another_merge_request) do + create( + :merge_request, + source_project: project, + source_branch: 'master', + target_branch: 'test', + target_project: project + ) + end + + let!(:build_failed_todo) do + create( + :todo, + :build_failed, + user: user, + project: project, + target: merge_request, + author: user + ) + end + + before do + project.add_developer(user) + end + + it 'creates a push object and calls notify' do + expect(Gitlab::Git::Push).to receive(:new).with(project, oldrev, newrev, ref).and_call_original + expect(service).to receive(:notify) + + service.execute(oldrev, newrev, ref) + end + + describe 'notification behaviors' do + it 'outdates MR suggestions' do + expect_next_instance_of(Suggestions::OutdateService) do |outdate_service| + expect(outdate_service).to receive(:execute).with(merge_request).and_call_original + expect(outdate_service).to receive(:execute).with(another_merge_request).and_call_original + end + + service.execute(oldrev, newrev, ref) + end + + it 'marks pending todos as done' do + expect(build_failed_todo.reload).to be_pending + + service.execute(oldrev, newrev, ref) + + expect(build_failed_todo.reload).to be_done + end + + it 'calls the merge request activity counter' do + expect(Gitlab::UsageDataCounters::MergeRequestActivityUniqueCounter) + .to receive(:track_mr_including_ci_config) + .with(user: merge_request.author, merge_request: merge_request) + + expect(Gitlab::UsageDataCounters::MergeRequestActivityUniqueCounter) + .to receive(:track_mr_including_ci_config) + .with(user: another_merge_request.author, merge_request: another_merge_request) + + service.execute(oldrev, newrev, ref) + end + + it 'calls notify_about_push for each merge request' do + expect(service).to receive(:notify_about_push).with(merge_request) + expect(service).to receive(:notify_about_push).with(another_merge_request) + + service.execute(oldrev, newrev, ref) + end + end + + describe 'branch presence changes' do + context 'when branch is added' do + let(:blank_sha) { Gitlab::Git::SHA1_BLANK_SHA } + + it 'adds system note about branch restoration' do + expect do + service.execute(blank_sha, newrev, ref) + end.to change { merge_request.reload.notes.count }.by(1) + + note = merge_request.notes.last + expect(note.note).to include('restored source branch') + end + end + + context 'when branch is removed' do + let(:blank_sha) { Gitlab::Git::SHA1_BLANK_SHA } + + it 'adds system note about branch deletion' do + expect do + service.execute(newrev, blank_sha, ref) + end.to change { merge_request.reload.notes.count }.by(1) + + note = merge_request.notes.last + expect(note.note).to include('deleted source branch') + end + end + end + + describe 'push notifications' do + let(:notification_service) { instance_double(NotificationService) } + + before do + allow(NotificationService).to receive(:new) { notification_service } + end + + context 'when there are new commits' do + it 'adds system notes about commits and sends notifications' do + expect(SystemNoteService).to receive(:add_commits).twice + expect(notification_service).to receive(:push_to_merge_request).twice + + service.execute(oldrev, newrev, ref) + end + end + + context 'when there are no commits' do + let(:oldrev) { newrev } + + it 'does not add system notes or send notifications' do + expect(SystemNoteService).not_to receive(:add_commits) + expect(notification_service).not_to receive(:push_to_merge_request) + + service.execute(oldrev, newrev, ref) + end + end + + context 'when branch is removed' do + let(:blank_sha) { Gitlab::Git::SHA1_BLANK_SHA } + + it 'does not add commit notes or send push notifications' do + expect(SystemNoteService).not_to receive(:add_commits) + expect(notification_service).not_to receive(:push_to_merge_request) + + service.execute(newrev, blank_sha, ref) + end + end + end + + describe '#find_new_commits' do + context 'when branch is added' do + let(:blank_sha) { Gitlab::Git::SHA1_BLANK_SHA } + + it 'finds commits between common ref and new rev' do + expect(project.repository).to receive(:merge_base).and_return('common_ref') + expect(project.repository).to receive(:commits_between).with('common_ref', newrev) + + service.execute(blank_sha, newrev, ref) + end + end + + context 'when branch is removed' do + let(:blank_sha) { Gitlab::Git::SHA1_BLANK_SHA } + + it 'sets commits to empty array' do + service.execute(newrev, blank_sha, ref) + + expect(service.instance_variable_get(:@commits)).to eq([]) + end + end + + context 'when branch is updated' do + it 'finds commits between old and new rev' do + expect(project.repository).to receive(:commits_between).with(oldrev, newrev) + + service.execute(oldrev, newrev, ref) + end + end + end + end +end diff --git a/spec/services/merge_requests/refresh_service_spec.rb b/spec/services/merge_requests/refresh_service_spec.rb index 696edd43988be0..ce0223e9929372 100644 --- a/spec/services/merge_requests/refresh_service_spec.rb +++ b/spec/services/merge_requests/refresh_service_spec.rb @@ -9,9 +9,12 @@ let(:project) { create(:project, :repository) } let(:user) { create(:user) } let(:service) { described_class } + let(:notification_ff) { false } describe '#execute' do before do + stub_feature_flags(split_refresh_worker_notification: notification_ff) + @user = create(:user) group = create(:group) group.add_owner(@user) @@ -119,6 +122,53 @@ expect(@fork_build_failed_todo).to be_done end + context 'when push is a branch removal' do + before do + # If @newrev is a blank SHA, it means the ref has been removed + @newrev = Gitlab::Git::SHA1_BLANK_SHA + end + + it 'calls change branch system note service' do + expect(SystemNoteService).to receive(:change_branch_presence).at_least(:once) + + refresh_service.execute(@oldrev, @newrev, 'refs/heads/master') + end + + context 'when ff is on' do + let(:notification_ff) { true } + + it 'does not call change branch system note service' do + expect(SystemNoteService).not_to receive(:change_branch_presence) + + refresh_service.execute(@oldrev, @newrev, 'refs/heads/master') + end + end + end + + context 'when ff is on' do + let(:notification_ff) { true } + + it 'does not mark the todos as done' do + refresh_service.execute(@oldrev, @newrev, 'refs/heads/master') + + expect(@build_failed_todo).to be_pending + expect(@fork_build_failed_todo).to be_pending + end + + it 'calls the notification worker' do + expect(MergeRequests::Refresh::NotificationWorker).to receive(:perform_async) + .with( + @project.id, + @user.id, + @oldrev, + @newrev, + 'refs/heads/master' + ) + + refresh_service.execute(@oldrev, @newrev, 'refs/heads/master') + end + end + it 'triggers mergeRequestMergeStatusUpdated GraphQL subscription conditionally' do expect(GraphqlTriggers).to receive(:merge_request_merge_status_updated).twice.with(@merge_request) expect(GraphqlTriggers).to receive(:merge_request_merge_status_updated).twice.with(@another_merge_request) @@ -169,6 +219,16 @@ refresh_service.execute(@oldrev, @newrev, 'refs/heads/master') end + context 'when FF is on' do + let(:notification_ff) { true } + + it 'does not outdate MR suggestions' do + expect(Suggestions::OutdateService).not_to receive(:new) + + refresh_service.execute(@oldrev, @newrev, 'refs/heads/master') + end + end + context 'when source branch ref does not exists' do before do ::Branches::DeleteService.new(@project, @user).execute(@merge_request.source_branch) @@ -200,6 +260,17 @@ refresh_service.execute(@oldrev, @newrev, 'refs/heads/master') end + + context 'when ff is on' do + let(:notification_ff) { true } + + it 'does not track the activity counter' do + expect(Gitlab::UsageDataCounters::MergeRequestActivityUniqueCounter) + .not_to receive(:track_mr_including_ci_config) + + refresh_service.execute(@oldrev, @newrev, 'refs/heads/master') + end + end end context 'when pipeline exists for the source branch' do @@ -256,6 +327,12 @@ expect { subject } .not_to change { @merge_request.pipelines_for_merge_request.count } end + + it 'calls change branch system note service' do + expect(SystemNoteService).to receive(:change_branch_presence).at_least(:once) + + subject + end end context 'when "push_options: nil" is passed' do diff --git a/spec/workers/merge_requests/refresh/notification_worker_spec.rb b/spec/workers/merge_requests/refresh/notification_worker_spec.rb new file mode 100644 index 00000000000000..41e76f216f3f5b --- /dev/null +++ b/spec/workers/merge_requests/refresh/notification_worker_spec.rb @@ -0,0 +1,137 @@ +# frozen_string_literal: true + +require 'spec_helper' + +RSpec.describe MergeRequests::Refresh::NotificationWorker, feature_category: :code_review_workflow do + include AfterNextHelpers + + let_it_be(:project) { create(:project, :repository) } + let_it_be(:user) { create(:user) } + + let(:worker) { described_class.new } + let(:oldrev) { TestEnv::BRANCH_SHA['flatten-dir'] } + let(:newrev) { TestEnv::BRANCH_SHA['improve/awesome'] } + let(:ref) { 'refs/heads/master' } + + before_all do + project.add_developer(user) + end + + it_behaves_like 'an idempotent worker' do + let(:job_args) { [project.id, user.id, oldrev, newrev, ref] } + end + + describe 'worker configuration' do + it 'has the correct feature category' do + expect(described_class.get_feature_category).to eq(:code_review_workflow) + end + + it 'has the correct urgency' do + expect(described_class.get_urgency).to eq(:low) + end + + it 'has the correct resource boundary' do + expect(described_class.get_worker_resource_boundary).to eq(:cpu) + end + + it 'has the correct weight' do + expect(described_class.get_weight).to eq(2) + end + + it 'is configured as idempotent' do + expect(described_class.idempotent?).to be_truthy + end + + it 'has deduplication configured' do + expect(described_class.get_deduplicate_strategy).to eq(:until_executed) + end + end + + describe '#perform' do + it 'calls MergeRequests::Refresh::NotificationService#execute' do + expect_next(MergeRequests::Refresh::NotificationService, project: project, current_user: user) + .to receive(:execute).with(oldrev, newrev, ref) + + worker.perform(project.id, user.id, oldrev, newrev, ref) + end + + it 'creates the service with correct parameters' do + service_double = instance_double(MergeRequests::Refresh::NotificationService) + expect(MergeRequests::Refresh::NotificationService) + .to receive(:new) + .with(project: project, current_user: user) + .and_return(service_double) + expect(service_double).to receive(:execute).with(oldrev, newrev, ref) + + worker.perform(project.id, user.id, oldrev, newrev, ref) + end + + context 'with a non-existing project' do + it 'does nothing when project does not exist' do + expect(MergeRequests::Refresh::NotificationService).not_to receive(:new) + + worker.perform(non_existing_record_id, user.id, oldrev, newrev, ref) + end + + it 'does not raise an exception' do + expect { worker.perform(non_existing_record_id, user.id, oldrev, newrev, ref) } + .not_to raise_exception + end + end + + context 'with a non-existing user' do + it 'does nothing when user does not exist' do + expect(MergeRequests::Refresh::NotificationService).not_to receive(:new) + + worker.perform(project.id, non_existing_record_id, oldrev, newrev, ref) + end + + it 'does not raise an exception' do + expect { worker.perform(project.id, non_existing_record_id, oldrev, newrev, ref) } + .not_to raise_exception + end + end + + context 'when both project and user do not exist' do + it 'does nothing' do + expect(MergeRequests::Refresh::NotificationService).not_to receive(:new) + + worker.perform(non_existing_record_id, non_existing_record_id, oldrev, newrev, ref) + end + + it 'does not raise an exception' do + expect { worker.perform(non_existing_record_id, non_existing_record_id, oldrev, newrev, ref) } + .not_to raise_exception + end + end + + context 'when project exists but user does not' do + it 'returns early without calling the service' do + expect(MergeRequests::Refresh::NotificationService).not_to receive(:new) + + worker.perform(project.id, non_existing_record_id, oldrev, newrev, ref) + end + end + + context 'when user exists but project does not' do + it 'returns early without calling the service' do + expect(MergeRequests::Refresh::NotificationService).not_to receive(:new) + + worker.perform(non_existing_record_id, user.id, oldrev, newrev, ref) + end + end + + context 'with valid git references' do + it 'passes through the git references correctly' do + custom_oldrev = 'abc123' + custom_newrev = 'def456' + custom_ref = 'refs/heads/feature' + + expect_next(MergeRequests::Refresh::NotificationService, project: project, current_user: user) + .to receive(:execute).with(custom_oldrev, custom_newrev, custom_ref) + + worker.perform(project.id, user.id, custom_oldrev, custom_newrev, custom_ref) + end + end + end +end -- GitLab