From 1ef002ff40cb506ba15570172b482ac080bc937e Mon Sep 17 00:00:00 2001 From: Doug Stull Date: Fri, 22 Aug 2025 14:54:09 -0400 Subject: [PATCH] Add pubsub event to access request notification publication - In order to fulfill the notifications [design blueprint](https://gitlab.com/gitlab-com/content-sites/handbook/-/blob/main/content/handbook/engineering/architecture/design-documents/notifications/_index.md?ref_type=heads) we need to move every place where we create or resolve todos or notifications to use event store. This MR handles todos and notifications which are triggered when publishing draft note. - see https://gitlab.com/gitlab-org/gitlab/-/issues/541845 --- app/events/members/access_request_event.rb | 21 ++++ app/models/member.rb | 12 +- app/workers/all_queues.yml | 10 ++ .../members/process_access_request_worker.rb | 35 ++++++ ..._store_migration_member_access_request.yml | 10 ++ config/sidekiq_queues.yml | 2 + ee/spec/lib/ee/gitlab/event_store_spec.rb | 1 + lib/gitlab/event_store.rb | 1 + spec/models/member_spec.rb | 105 ++++++++++++++---- .../process_access_request_worker_spec.rb | 53 +++++++++ 10 files changed, 224 insertions(+), 26 deletions(-) create mode 100644 app/events/members/access_request_event.rb create mode 100644 app/workers/members/process_access_request_worker.rb create mode 100644 config/feature_flags/gitlab_com_derisk/notification_event_store_migration_member_access_request.yml create mode 100644 spec/workers/members/process_access_request_worker_spec.rb diff --git a/app/events/members/access_request_event.rb b/app/events/members/access_request_event.rb new file mode 100644 index 00000000000000..d194b71e4fb1cc --- /dev/null +++ b/app/events/members/access_request_event.rb @@ -0,0 +1,21 @@ +# frozen_string_literal: true + +module Members + class AccessRequestEvent < ::Gitlab::EventStore::Event + def schema + { + 'type' => 'object', + 'required' => %w[member_id recipient_ids], + 'properties' => { + 'member_id' => { 'type' => 'integer' }, + 'recipient_ids' => { + 'type' => 'array', + 'items' => { + 'type' => 'integer' + } + } + } + } + end + end +end diff --git a/app/models/member.rb b/app/models/member.rb index d95c8d1b41fc02..d77773c09775bc 100644 --- a/app/models/member.rb +++ b/app/models/member.rb @@ -685,9 +685,15 @@ def send_invite end def send_request - if notifiable?(:subscription) - source.access_request_approvers_to_be_notified.each do |recipient| - Members::AccessRequestedMailer.with(member: self, recipient: recipient.user).email.deliver_later + if notifiable?(:subscription) && source.access_request_approvers_to_be_notified.any? + if Feature.enabled?(:notification_event_store_migration_member_access_request, user) + data = { member_id: id, recipient_ids: source.access_request_approvers_to_be_notified.ids } + + Gitlab::EventStore.publish(Members::AccessRequestEvent.new(data: data)) + else + source.access_request_approvers_to_be_notified.each do |recipient| + Members::AccessRequestedMailer.with(member: self, recipient: recipient.user).email.deliver_later + end end end diff --git a/app/workers/all_queues.yml b/app/workers/all_queues.yml index 3e49f510922e1f..e3ebb3cc6a02d4 100644 --- a/app/workers/all_queues.yml +++ b/app/workers/all_queues.yml @@ -4240,6 +4240,16 @@ :tags: - :cronjob_child :queue_namespace: +- :name: members_process_access_request + :worker_name: Members::ProcessAccessRequestWorker + :feature_category: :system_access + :has_external_dependencies: false + :urgency: :low + :resource_boundary: :unknown + :weight: 1 + :idempotent: true + :tags: [] + :queue_namespace: - :name: members_prune_deletions :worker_name: Members::PruneDeletionsWorker :feature_category: :seat_cost_management diff --git a/app/workers/members/process_access_request_worker.rb b/app/workers/members/process_access_request_worker.rb new file mode 100644 index 00000000000000..d7793a16f63c3d --- /dev/null +++ b/app/workers/members/process_access_request_worker.rb @@ -0,0 +1,35 @@ +# frozen_string_literal: true + +module Members + class ProcessAccessRequestWorker + include Gitlab::EventStore::Subscriber + + data_consistency :always + feature_category :system_access + urgency :low + defer_on_database_health_signal :gitlab_main, [:members], 1.minute + idempotent! + + def handle_event(event) + member = Member.find_by_id(event.data[:member_id]) + unless member + logger.info( + structured_payload(message: 'Member not found.', member_id: event.data[:member_id]) + ) + return + end + + recipients = Member.id_in(event.data[:recipient_ids]) + unless recipients.any? + logger.info( + structured_payload(message: 'Recipients not found.', recipient_ids: event.data[:recipient_ids]) + ) + return + end + + recipients.each do |recipient| + Members::AccessRequestedMailer.with(member: member, recipient: recipient.user).email.deliver_later # rubocop:disable CodeReuse/ActiveRecord -- false positive on `with` + end + end + end +end diff --git a/config/feature_flags/gitlab_com_derisk/notification_event_store_migration_member_access_request.yml b/config/feature_flags/gitlab_com_derisk/notification_event_store_migration_member_access_request.yml new file mode 100644 index 00000000000000..c8469166e0fb9b --- /dev/null +++ b/config/feature_flags/gitlab_com_derisk/notification_event_store_migration_member_access_request.yml @@ -0,0 +1,10 @@ +--- +name: notification_event_store_migration_member_access_request +description: +feature_issue_url: https://gitlab.com/gitlab-org/gitlab/-/issues/541845 +introduced_by_url: +rollout_issue_url: https://gitlab.com/gitlab-org/gitlab/-/issues/564083 +milestone: '18.4' +group: group::engagement +type: gitlab_com_derisk +default_enabled: false diff --git a/config/sidekiq_queues.yml b/config/sidekiq_queues.yml index 6374480d48a929..37c648e95698e8 100644 --- a/config/sidekiq_queues.yml +++ b/config/sidekiq_queues.yml @@ -617,6 +617,8 @@ - 1 - - members_groups_export_detailed_memberships - 1 +- - members_process_access_request + - 1 - - members_prune_deletions - 1 - - merge diff --git a/ee/spec/lib/ee/gitlab/event_store_spec.rb b/ee/spec/lib/ee/gitlab/event_store_spec.rb index b35aaa58710317..83813d4e54720c 100644 --- a/ee/spec/lib/ee/gitlab/event_store_spec.rb +++ b/ee/spec/lib/ee/gitlab/event_store_spec.rb @@ -15,6 +15,7 @@ ::Ci::PipelineCreatedEvent, ::Ci::JobSecurityScanCompletedEvent, ::Repositories::KeepAroundRefsCreatedEvent, + ::Members::AccessRequestEvent, ::MergeRequests::ApprovedEvent, ::MergeRequests::MergedEvent, ::MergeRequests::JiraTitleDescriptionUpdateEvent, diff --git a/lib/gitlab/event_store.rb b/lib/gitlab/event_store.rb index 83286c288a09ff..c558100805ae2e 100644 --- a/lib/gitlab/event_store.rb +++ b/lib/gitlab/event_store.rb @@ -59,6 +59,7 @@ def configure!(store) store.subscribe ::Pages::DeletePagesDeploymentWorker, to: ::Projects::ProjectArchivedEvent store.subscribe ::Pages::ResetPagesDefaultDomainRedirectWorker, to: ::Pages::Domains::PagesDomainDeletedEvent store.subscribe ::MergeRequests::ProcessDraftNotePublishedWorker, to: ::MergeRequests::DraftNotePublishedEvent + store.subscribe ::Members::ProcessAccessRequestWorker, to: ::Members::AccessRequestEvent subscribe_to_member_destroyed_events(store) end diff --git a/spec/models/member_spec.rb b/spec/models/member_spec.rb index 10583e8a964c57..0075207ecc2af7 100644 --- a/spec/models/member_spec.rb +++ b/spec/models/member_spec.rb @@ -1280,24 +1280,20 @@ context 'for request emails' do let_it_be(:new_approver) { create(:user) } - before do - allow(Members::AccessRequestedMailer).to receive(:with).and_call_original - end - context 'for a project' do let_it_be(:project) { create(:project, :public, maintainers: new_approver) } let_it_be(:recipient) { project.add_maintainer(project.first_owner).user } - before_all do - project.add_maintainer(new_approver) - end + it 'publishes an event with all approvers' do + allow(::Gitlab::EventStore).to receive(:publish) - it 'enqueues emails for all approvers' do - expect do - member = create(:project_member, source: project, requested_at: Time.current) - expect(Members::AccessRequestedMailer).to have_received(:with).with(member: member, recipient: recipient) - expect(Members::AccessRequestedMailer).to have_received(:with).with(member: member, recipient: new_approver) - end.to have_enqueued_mail(Members::AccessRequestedMailer, :email).exactly(:twice) + member = create(:project_member, source: project, requested_at: Time.current) + expected_recipient_ids = [recipient.members.first.id, new_approver.members.first.id] + + expect(::Gitlab::EventStore).to have_received(:publish) do |event| + expect(event.data[:member_id]).to eq(member.id) + expect(event.data[:recipient_ids]).to match_array(expected_recipient_ids) + end end context 'and not eligible for notification' do @@ -1308,9 +1304,38 @@ end it 'does not enqueue emails' do + expect(::Gitlab::EventStore).not_to receive(:publish) + + create(:project_member, source: project, requested_at: Time.current) + end + end + + context 'when notification_event_store_migration_member_access_request feature flag is disabled' do + before do + allow(Members::AccessRequestedMailer).to receive(:with).and_call_original + stub_feature_flags(notification_event_store_migration_member_access_request: false) + end + + it 'enqueues emails for all approvers' do expect do - create(:project_member, source: project, requested_at: Time.current) - end.not_to have_enqueued_mail(Members::AccessRequestedMailer, :email) + member = create(:project_member, source: project, requested_at: Time.current) + expect(Members::AccessRequestedMailer).to have_received(:with).with(member: member, recipient: recipient) + expect(Members::AccessRequestedMailer).to have_received(:with).with(member: member, recipient: new_approver) + end.to have_enqueued_mail(Members::AccessRequestedMailer, :email).exactly(:twice) + end + + context 'and not eligible for notification' do + before do + allow_next_instance_of(ProjectMember) do |instance| + allow(instance).to receive(:notifiable?).with(:subscription).and_return(false) + end + end + + it 'does not enqueue emails' do + expect do + create(:project_member, source: project, requested_at: Time.current) + end.not_to have_enqueued_mail(Members::AccessRequestedMailer, :email) + end end end end @@ -1319,12 +1344,16 @@ let_it_be(:recipient) { create(:user) } let_it_be(:group) { create(:group, :public, owners: [recipient, new_approver]) } - it 'enqueues emails for all approvers' do - expect do - member = create(:group_member, source: group, requested_at: Time.current) - expect(Members::AccessRequestedMailer).to have_received(:with).with(member: member, recipient: recipient) - expect(Members::AccessRequestedMailer).to have_received(:with).with(member: member, recipient: new_approver) - end.to have_enqueued_mail(Members::AccessRequestedMailer, :email).exactly(:twice) + it 'publishes an event with all approvers' do + allow(::Gitlab::EventStore).to receive(:publish) + + member = create(:group_member, source: group, requested_at: Time.current) + expected_recipient_ids = [recipient.members.first.id, new_approver.members.first.id] + + expect(::Gitlab::EventStore).to have_received(:publish) do |event| + expect(event.data[:member_id]).to eq(member.id) + expect(event.data[:recipient_ids]).to match_array(expected_recipient_ids) + end end context 'and not eligible for notification' do @@ -1335,9 +1364,39 @@ end it 'does not enqueue emails' do + expect(::Gitlab::EventStore).not_to receive(:publish) + + create(:group_member, source: group, requested_at: Time.current) + end + end + + context 'when notification_event_store_migration_member_access_request feature flag is disabled' do + before do + allow(Members::AccessRequestedMailer).to receive(:with).and_call_original + stub_feature_flags(notification_event_store_migration_member_access_request: false) + end + + it 'enqueues emails for all approvers' do expect do - create(:group_member, source: group, requested_at: Time.current) - end.not_to have_enqueued_mail(Members::AccessRequestedMailer, :email) + member = create(:group_member, source: group, requested_at: Time.current) + expect(Members::AccessRequestedMailer).to have_received(:with).with(member: member, recipient: recipient) + expect(Members::AccessRequestedMailer) + .to have_received(:with).with(member: member, recipient: new_approver) + end.to have_enqueued_mail(Members::AccessRequestedMailer, :email).exactly(:twice) + end + + context 'and not eligible for notification' do + before do + allow_next_instance_of(GroupMember) do |instance| + allow(instance).to receive(:notifiable?).with(:subscription).and_return(false) + end + end + + it 'does not enqueue emails' do + expect do + create(:group_member, source: group, requested_at: Time.current) + end.not_to have_enqueued_mail(Members::AccessRequestedMailer, :email) + end end end end diff --git a/spec/workers/members/process_access_request_worker_spec.rb b/spec/workers/members/process_access_request_worker_spec.rb new file mode 100644 index 00000000000000..92fe4c98461b59 --- /dev/null +++ b/spec/workers/members/process_access_request_worker_spec.rb @@ -0,0 +1,53 @@ +# frozen_string_literal: true + +require 'spec_helper' + +RSpec.describe Members::ProcessAccessRequestWorker, feature_category: :system_access do + let_it_be(:new_approver) { create(:user) } + let_it_be(:project) { create(:project, maintainers: new_approver) } + let_it_be(:recipient) { project.add_maintainer(project.first_owner).user } + let_it_be(:member) { project.add_maintainer(create(:user)) } + + let(:recipient_ids) { [recipient.members.first.id, new_approver.members.first.id] } + let(:member_id) { member.id } + let(:data) { { member_id: member_id, recipient_ids: recipient_ids } } + let(:event) { Members::AccessRequestEvent.new(data: data) } + + it_behaves_like 'subscribes to event' + + it 'enqueues emails for all approvers' do + allow(Members::AccessRequestedMailer).to receive(:with).and_call_original + + expect do + consume_event(subscriber: described_class, event: event) + + expect(Members::AccessRequestedMailer).to have_received(:with).with(member: member, recipient: recipient) + expect(Members::AccessRequestedMailer).to have_received(:with).with(member: member, recipient: new_approver) + end.to have_enqueued_mail(Members::AccessRequestedMailer, :email).exactly(:twice) + end + + shared_examples 'when object does not exist' do + it 'logs and does not call the mailer' do + expect(Sidekiq.logger).to receive(:info).with(hash_including(log_payload)) + expect(Members::AccessRequestedMailer).not_to receive(:with) + + expect { consume_event(subscriber: described_class, event: event) }.not_to raise_exception + end + end + + context 'when the member does not exist' do + let(:member_id) { non_existing_record_id } + + it_behaves_like 'when object does not exist' do + let(:log_payload) { { 'message' => 'Member not found.', 'member_id' => member_id } } + end + end + + context 'when the recipients do not exist' do + let(:recipient_ids) { [non_existing_record_id, non_existing_record_id] } + + it_behaves_like 'when object does not exist' do + let(:log_payload) { { 'message' => 'Recipients not found.', 'recipient_ids' => recipient_ids } } + end + end +end -- GitLab