From 80a6edced1cf473180fd2d872dde5da04508100d Mon Sep 17 00:00:00 2001 From: Olivier El Mekki Date: Wed, 8 Nov 2023 15:23:59 +0100 Subject: [PATCH 1/9] ADD sending release activities through ActivityPub In ActivityPub, getting notified of new activities (activities here being similar to entries in a RSS feed), is done in two steps: 1. the user interested in the resource subscribe to it. This was already implemented in previous work. [^1] 2. when an activity happens (a new release for a project, in our current case), we push a JSON payload to the servers of each subscriber to let them know. This commit is about this second part. We already have the list of subscribers and the URLs where to reach them (called the user inbox) in our database, we now need to fire the requests when the release event happens. Worth noting: ActivityPub is well designed enough to make sure this process is optimized. It is done through each server having a shared inbox so that when 1000 users are on the same server, we just fire a single request for the 1000 of them. ActivityPub being a federated protocol, with a client/server architecture and many users aggregating on a few servers, this ensures the load does not get out of control. Most (all?) implementations support shared inbox, so it shouldn't be an issue. The logic is implemented so that subscriptions having a shared inbox are prioritized, and there will be at most 100 requests fired per run of the service, requeuing the job if we're not done yet (given the amount of servers in the Fediverse, it's unlikely for now that there will be subscribers to a project on more than 100 different servers, but who know where the Fediverse is going). In case of several batches, to decide if a subscription has been processed, we check its `updated_at` time. If a server is unresponsive, we ignore it this time, so that we don't have to block the queue until it responds properly (which would happen if we let the job fail and retry later). We'll probably want later to bake something in to delete subscriptions of repeating offenders. Note: we now have two places in the codebase that has similar logic to upload payloads to the user's inbox (here, and where we confirm subscription, implemented in a previous commit), so it's been abstracted away in a concern. This will be reused abundantly. Note 2: We've had a previous discussion about the disabled Rubucop's rule. [^2] [^1]: [ADD ActivityPub subscription](https://gitlab.com/gitlab-org/gitlab/-/merge_requests/132460) [^2]: https://gitlab.com/gitlab-org/gitlab/-/issues/426174#note_1640714200 --- .../activity_pub/releases_subscription.rb | 18 ++- .../activity_pub/accept_follow_service.rb | 22 +--- .../send_release_activities_service.rb | 64 +++++++++++ app/services/concerns/activity_pub_request.rb | 21 ++++ .../publish_release_activities_worker.rb | 41 +++++++ app/workers/all_queues.yml | 9 ++ lib/gitlab/event_store.rb | 1 + .../activity_pub/releases_subscriptions.rb | 4 +- .../releases_subscription_spec.rb | 11 ++ .../send_release_activities_service_spec.rb | 104 ++++++++++++++++++ .../publish_release_activities_worker_spec.rb | 91 +++++++++++++++ 11 files changed, 363 insertions(+), 23 deletions(-) create mode 100644 app/services/activity_pub/projects/send_release_activities_service.rb create mode 100644 app/services/concerns/activity_pub_request.rb create mode 100644 app/workers/activity_pub/projects/publish_release_activities_worker.rb create mode 100644 spec/services/activity_pub/projects/send_release_activities_service_spec.rb create mode 100644 spec/workers/activity_pub/projects/publish_release_activities_worker_spec.rb diff --git a/app/models/activity_pub/releases_subscription.rb b/app/models/activity_pub/releases_subscription.rb index 0a4293b2bdea71..5e7d34f9c32fef 100644 --- a/app/models/activity_pub/releases_subscription.rb +++ b/app/models/activity_pub/releases_subscription.rb @@ -6,6 +6,14 @@ class ReleasesSubscription < ApplicationRecord enum :status, [:requested, :accepted], default: :requested + scope :with_shared_inbox, -> { where.not(shared_inbox_url: nil) } + scope :without_shared_inbox, -> { where(shared_inbox_url: nil) } + scope :pending_notification, ->(release) do + where(project_id: release.project_id).where("updated_at < ?", release.released_at) + end + scope :with_limit, ->(maximum) { limit(maximum) } + scope :group_by_shared_inbox, -> { select(:shared_inbox_url).group(:shared_inbox_url) } + attribute :payload, Gitlab::Database::Type::JsonPgSafe.new validates :payload, json_schema: { filename: 'activity_pub_follow_payload' }, allow_blank: true @@ -15,8 +23,14 @@ class ReleasesSubscription < ApplicationRecord public_url: { allow_nil: true } validates :shared_inbox_url, public_url: { allow_nil: true } - def self.find_by_project_and_subscriber(project_id, subscriber_url) - find_by('project_id = ? AND LOWER(subscriber_url) = ?', project_id, subscriber_url.downcase) + class << self + def find_by_project_and_subscriber(project_id, subscriber_url) + find_by('project_id = ? AND LOWER(subscriber_url) = ?', project_id, subscriber_url.downcase) + end + + def touch_shared_inbox(project_id, shared_inbox_url) + where(project_id: project_id, shared_inbox_url: shared_inbox_url).touch_all + end end end end diff --git a/app/services/activity_pub/accept_follow_service.rb b/app/services/activity_pub/accept_follow_service.rb index 0ec440fa972668..6f9b1f3a1d3f2c 100644 --- a/app/services/activity_pub/accept_follow_service.rb +++ b/app/services/activity_pub/accept_follow_service.rb @@ -2,6 +2,8 @@ module ActivityPub class AcceptFollowService + include ActivityPubRequest + MissingInboxURLError = Class.new(StandardError) attr_reader :subscription, :actor @@ -15,22 +17,12 @@ def execute return if subscription.accepted? raise MissingInboxURLError unless subscription.subscriber_inbox_url.present? - upload_accept_activity + upload_activity(payload, subscription.subscriber_inbox_url) subscription.accepted! end private - def upload_accept_activity - body = Gitlab::Json::LimitedEncoder.encode(payload, limit: 1.megabyte) - - begin - Gitlab::HTTP.post(subscription.subscriber_inbox_url, body: body, headers: headers) - rescue StandardError => e - raise ThirdPartyError, e.message - end - end - def payload follow = subscription.payload.dup follow.delete('@context') @@ -43,13 +35,5 @@ def payload object: follow } end - - def headers - { - 'User-Agent' => "GitLab/#{Gitlab::VERSION}", - 'Content-Type' => 'application/ld+json; profile="https://www.w3.org/ns/activitystreams"', - 'Accept' => 'application/ld+json; profile="https://www.w3.org/ns/activitystreams"' - } - end end end diff --git a/app/services/activity_pub/projects/send_release_activities_service.rb b/app/services/activity_pub/projects/send_release_activities_service.rb new file mode 100644 index 00000000000000..b58e7f4cad0032 --- /dev/null +++ b/app/services/activity_pub/projects/send_release_activities_service.rb @@ -0,0 +1,64 @@ +# frozen_string_literal: true + +module ActivityPub + module Projects + class SendReleaseActivitiesService + include ActivityPubRequest + + MAX_REQUEST_PER_RUN = 100 + + attr_reader :release, :activity + + def initialize(release, activity) + @release = release + @activity = activity + @has_more = true + end + + def execute + subscriptions = ReleasesSubscription.pending_notification(release) + with_shared_inbox = subscriptions.with_shared_inbox.group_by_shared_inbox.with_limit(MAX_REQUEST_PER_RUN) + + if with_shared_inbox.count.present? + notify_shared_inboxes(with_shared_inbox) + else + individuals = subscriptions.without_shared_inbox.with_limit(MAX_REQUEST_PER_RUN) + if individuals.count == 0 + @has_more = false + return + end + + notify_individuals(individuals) + end + end + + def has_more? + @has_more + end + + private + + def notify_shared_inboxes(subscriptions) + subscriptions.each do |subscription| + begin + upload_activity(activity, subscription.shared_inbox_url) + rescue ActivityPub::ThirdPartyError + end + + ReleasesSubscription.touch_shared_inbox(release.project_id, subscription.shared_inbox_url) + end + end + + def notify_individuals(subscriptions) + subscriptions.each do |subscription| + begin + upload_activity(activity, subscription.subscriber_inbox_url) + rescue ActivityPub::ThirdPartyError + end + + subscription.touch + end + end + end + end +end diff --git a/app/services/concerns/activity_pub_request.rb b/app/services/concerns/activity_pub_request.rb new file mode 100644 index 00000000000000..8e7f6c5e6688ea --- /dev/null +++ b/app/services/concerns/activity_pub_request.rb @@ -0,0 +1,21 @@ +# frozen_string_literal: true + +module ActivityPubRequest + def upload_activity(payload, inbox_url) + body = Gitlab::Json::LimitedEncoder.encode(payload, limit: 1.megabyte) + + begin + Gitlab::HTTP.post(inbox_url, body: body, headers: activity_pub_headers) + rescue StandardError => e + raise ActivityPub::ThirdPartyError, e.message + end + end + + def activity_pub_headers + { + 'User-Agent' => "GitLab/#{Gitlab::VERSION}", + 'Content-Type' => 'application/ld+json; profile="https://www.w3.org/ns/activitystreams"', + 'Accept' => 'application/ld+json; profile="https://www.w3.org/ns/activitystreams"' + } + end +end diff --git a/app/workers/activity_pub/projects/publish_release_activities_worker.rb b/app/workers/activity_pub/projects/publish_release_activities_worker.rb new file mode 100644 index 00000000000000..0d31d86563483c --- /dev/null +++ b/app/workers/activity_pub/projects/publish_release_activities_worker.rb @@ -0,0 +1,41 @@ +# frozen_string_literal: true + +module ActivityPub + module Projects + class PublishReleaseActivitiesWorker + include Gitlab::EventStore::Subscriber + + idempotent! + worker_has_external_dependencies! + feature_category :release_orchestration + data_consistency :delayed + queue_namespace :activity_pub + + def handle_event(event) + release = Release.find_by_id(event.data[:release_id]) + return unless release + + service = SendReleaseActivitiesService.new(release, activity_for(release)) + service.execute + + return unless service.has_more? + + PublishReleaseActivitiesWorker.perform_async(::Projects::ReleasePublishedEvent, event.data) + end + + private + + def activity_for(release) + # Not being able to use serializer here is a serious problem, given the + # activity we send here is the exact same data generated for + # ActivityPub::Projects::ReleasesController#outbox, and the same thing + # will happen for all actors (they provide the same data both in their + # `#outbox` endpoint and when sending out activities from workers). + # + # Disabling the rule for now, we need to discuss that. + serializer = ActivityPub::PublishReleaseActivitySerializer.new # rubocop:disable CodeReuse/Serializer -- see above. + serializer.represent(release) + end + end + end +end diff --git a/app/workers/all_queues.yml b/app/workers/all_queues.yml index 60ad122288cd34..ca5f2f350e0037 100644 --- a/app/workers/all_queues.yml +++ b/app/workers/all_queues.yml @@ -3,6 +3,15 @@ # # Do not edit it manually! --- +- :name: activity_pub:activity_pub_projects_publish_release_activities + :worker_name: ActivityPub::Projects::PublishReleaseActivitiesWorker + :feature_category: :release_orchestration + :has_external_dependencies: true + :urgency: :low + :resource_boundary: :unknown + :weight: 1 + :idempotent: true + :tags: [] - :name: activity_pub:activity_pub_projects_releases_subscription :worker_name: ActivityPub::Projects::ReleasesSubscriptionWorker :feature_category: :release_orchestration diff --git a/lib/gitlab/event_store.rb b/lib/gitlab/event_store.rb index b422fd061ff93a..36da12f79976bb 100644 --- a/lib/gitlab/event_store.rb +++ b/lib/gitlab/event_store.rb @@ -51,6 +51,7 @@ def self.configure!(store) to: ::Packages::PackageCreatedEvent, if: -> (event) { ::Ml::ExperimentTracking::AssociateMlCandidateToPackageWorker.handles_event?(event) } store.subscribe ::Ci::InitializePipelinesIidSequenceWorker, to: ::Projects::ProjectCreatedEvent + store.subscribe ::ActivityPub::Projects::PublishReleaseActivitiesWorker, to: ::Projects::ReleasePublishedEvent end private_class_method :configure! end diff --git a/spec/factories/activity_pub/releases_subscriptions.rb b/spec/factories/activity_pub/releases_subscriptions.rb index b789188528a671..abfa89ea1bacaa 100644 --- a/spec/factories/activity_pub/releases_subscriptions.rb +++ b/spec/factories/activity_pub/releases_subscriptions.rb @@ -3,7 +3,7 @@ FactoryBot.define do factory :activity_pub_releases_subscription, class: 'ActivityPub::ReleasesSubscription' do project - subscriber_url { 'https://example.com/actor' } + subscriber_url { |i| "https://example.com/actor-#{i}" } status { :requested } payload do { @@ -16,7 +16,7 @@ end trait :inbox do - subscriber_inbox_url { 'https://example.com/actor/inbox' } + subscriber_inbox_url { |i| "https://example.com/actor/inbox-#{i}" } end trait :shared_inbox do diff --git a/spec/models/activity_pub/releases_subscription_spec.rb b/spec/models/activity_pub/releases_subscription_spec.rb index 0633f293971b67..7e0bf56a96ec9e 100644 --- a/spec/models/activity_pub/releases_subscription_spec.rb +++ b/spec/models/activity_pub/releases_subscription_spec.rb @@ -90,4 +90,15 @@ expect(result).to be(nil) end end + + describe '.touch_shared_inbox' do + let_it_be(:subscription) { create(:activity_pub_releases_subscription, :shared_inbox, updated_at: 1.month.ago) } + + it 'updates matching records' do + described_class.touch_shared_inbox(subscription.project_id, subscription.shared_inbox_url) + subscription.reload + + expect(subscription.updated_at).to be_within(1.hour).of(Time.current) + end + end end diff --git a/spec/services/activity_pub/projects/send_release_activities_service_spec.rb b/spec/services/activity_pub/projects/send_release_activities_service_spec.rb new file mode 100644 index 00000000000000..7e9fe1e10b3ea5 --- /dev/null +++ b/spec/services/activity_pub/projects/send_release_activities_service_spec.rb @@ -0,0 +1,104 @@ +# frozen_string_literal: true + +require 'spec_helper' + +RSpec.describe ActivityPub::Projects::SendReleaseActivitiesService, feature_category: :release_orchestration do + let_it_be(:release) { create(:release, released_at: Time.current) } + let_it_be(:shared_inbox) { 'http://example.com/shared-inbox' } + let_it_be(:individual_inbox) { 'http://example.com/actor/inbox' } + + let_it_be_with_reload(:existing_subscription_with_shared_inbox) do + create_list(:activity_pub_releases_subscription, 2, { + shared_inbox_url: shared_inbox, + project: release.project, + created_at: 1.month.ago, + updated_at: 1.month.ago + }) + end + + let_it_be_with_reload(:existing_individual_subscription) do + create(:activity_pub_releases_subscription, { + subscriber_inbox_url: individual_inbox, + project: release.project, + created_at: 1.month.ago, + updated_at: 1.month.ago + }) + end + + let(:activity) do + { + id: 'http://example.com/activity', + '@context': 'https://www.w3.org/ns/activitystreams', + type: 'Create', + actor: 'http://example.com/actor', + object: { + id: 'http://example.com/release', + type: 'Application' + } + } + end + + let(:service) { described_class.new(release, activity) } + + before do + allow(service).to receive(:upload_activity).and_return(true) + end + + describe '#execute' do + describe 'when there are pending subscriptions with shared inbox' do + before do + service.execute + end + + it 'sends activities to the shared inbox' do + expect(service).to have_received(:upload_activity).with(activity, shared_inbox) + end + + it 'updates subscription with shared inbox' do + expect(ActivityPub::ReleasesSubscription.with_shared_inbox.pending_notification(release)).to be_empty + end + + it 'does not update subscription with individual inbox' do + expect(ActivityPub::ReleasesSubscription.without_shared_inbox.pending_notification(release).count).to eq 1 + end + + it 'says there may be more subscriptions to proceed' do + expect(service.has_more?).to be_truthy + end + end + + describe 'when there are pending subscriptions without shared inbox' do + before do + ActivityPub::ReleasesSubscription.with_shared_inbox.touch_all + service.execute + end + + it 'sends activities to the individual inboxes' do + expect(service).to have_received(:upload_activity).with(activity, individual_inbox) + end + + it 'updates subscription with individual inbox' do + expect(ActivityPub::ReleasesSubscription.without_shared_inbox.pending_notification(release).count).to eq 0 + end + + it 'says there may be more subscriptions to proceed' do + expect(service.has_more?).to be_truthy + end + end + + describe 'when there is not more pending subscription' do + before do + ActivityPub::ReleasesSubscription.touch_all + service.execute + end + + it 'does not send any activity' do + expect(service).not_to have_received(:upload_activity) + end + + it 'says there is no more subscriptions to proceed' do + expect(service.has_more?).to be_falsey + end + end + end +end diff --git a/spec/workers/activity_pub/projects/publish_release_activities_worker_spec.rb b/spec/workers/activity_pub/projects/publish_release_activities_worker_spec.rb new file mode 100644 index 00000000000000..23feae1f445b33 --- /dev/null +++ b/spec/workers/activity_pub/projects/publish_release_activities_worker_spec.rb @@ -0,0 +1,91 @@ +# frozen_string_literal: true + +require 'spec_helper' + +RSpec.describe ActivityPub::Projects::PublishReleaseActivitiesWorker, feature_category: :release_orchestration do + let(:worker) { described_class.new } + + let(:release) { build_stubbed(:release) } + + let(:event) do + ::Projects::ReleasePublishedEvent.new(data: HashWithIndifferentAccess.new(release_id: release.id)) + end + + let(:service) do + instance_double(ActivityPub::Projects::SendReleaseActivitiesService, has_more?: has_more, execute: true) + end + + let(:serializer) { instance_double(ActivityPub::PublishReleaseActivitySerializer, represent: payload) } + let(:has_more) { true } + + let(:payload) do + { + '@context': 'https://www.w3.org/ns/activitystreams', + id: 'http://example.com/activity', + type: 'Create', + actor: 'http://example.com/actor', + object: { + id: 'http://example.com/release', + type: 'Application' + } + } + end + + before do + allow(ActivityPub::Projects::SendReleaseActivitiesService).to receive(:new) { service } + allow(ActivityPub::PublishReleaseActivitySerializer).to receive(:new) { serializer } + allow(described_class).to receive(:perform_async) + end + + shared_examples_for 'successful run' do + it 'serializes the activity' do + expect(serializer).to have_received(:represent) + end + + it 'calls the activity sending service' do + expect(ActivityPub::Projects::SendReleaseActivitiesService).to have_received(:new).with(release, payload) + expect(service).to have_received(:execute) + end + end + + describe '#handle_event' do + describe 'when the release exists' do + before do + allow(Release).to receive(:find_by_id) { release } + worker.handle_event(event) + end + + describe 'when the job sent the last activities' do + let(:has_more) { false } + + it_behaves_like 'successful run' + + it 'does not queue an other run' do + expect(described_class).not_to have_received(:perform_async) + end + end + + describe 'when there is still more work to do' do + it_behaves_like 'successful run' + + it 'queues an other run' do + expect(described_class).to have_received(:perform_async) + end + end + end + + describe 'when the release does not exist' do + before do + worker.handle_event(event) + end + + it 'does not call the activity sending service' do + expect(service).not_to have_received(:execute) + end + + it 'does not queue an other run' do + expect(described_class).not_to have_received(:perform_async) + end + end + end +end -- GitLab From e44e18479e94fc74641b71644694d3f40a0f4a34 Mon Sep 17 00:00:00 2001 From: Patrick Cyiza Date: Tue, 23 Jan 2024 13:29:01 +0000 Subject: [PATCH 2/9] REFACTOR use named params for SQL query --- app/models/activity_pub/releases_subscription.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/app/models/activity_pub/releases_subscription.rb b/app/models/activity_pub/releases_subscription.rb index 5e7d34f9c32fef..6bc53565f6cc2d 100644 --- a/app/models/activity_pub/releases_subscription.rb +++ b/app/models/activity_pub/releases_subscription.rb @@ -25,7 +25,7 @@ class ReleasesSubscription < ApplicationRecord class << self def find_by_project_and_subscriber(project_id, subscriber_url) - find_by('project_id = ? AND LOWER(subscriber_url) = ?', project_id, subscriber_url.downcase) + find_by('project_id = :project_id AND LOWER(subscriber_url) = :subscriber_url', project_id: project_id, subscriber_url: subscriber_url.downcase) end def touch_shared_inbox(project_id, shared_inbox_url) -- GitLab From 2e663488209c1884d3272bba2a9962ee929d59e2 Mon Sep 17 00:00:00 2001 From: Olivier El Mekki Date: Tue, 23 Jan 2024 14:38:27 +0100 Subject: [PATCH 3/9] REFACTOR rename with_shared_inbox -> shared_inboxes --- .../projects/send_release_activities_service.rb | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/app/services/activity_pub/projects/send_release_activities_service.rb b/app/services/activity_pub/projects/send_release_activities_service.rb index b58e7f4cad0032..0ff1f7cc3b0d19 100644 --- a/app/services/activity_pub/projects/send_release_activities_service.rb +++ b/app/services/activity_pub/projects/send_release_activities_service.rb @@ -17,10 +17,10 @@ def initialize(release, activity) def execute subscriptions = ReleasesSubscription.pending_notification(release) - with_shared_inbox = subscriptions.with_shared_inbox.group_by_shared_inbox.with_limit(MAX_REQUEST_PER_RUN) + shared_inboxes = subscriptions.with_shared_inbox.group_by_shared_inbox.with_limit(MAX_REQUEST_PER_RUN) - if with_shared_inbox.count.present? - notify_shared_inboxes(with_shared_inbox) + if shared_inboxes.count.present? + notify_shared_inboxes(shared_inboxes) else individuals = subscriptions.without_shared_inbox.with_limit(MAX_REQUEST_PER_RUN) if individuals.count == 0 -- GitLab From 87fff83781665864fab7a4d30983b00db406a1a1 Mon Sep 17 00:00:00 2001 From: Olivier El Mekki Date: Tue, 23 Jan 2024 14:41:38 +0100 Subject: [PATCH 4/9] FIX rubocop complaining --- app/models/activity_pub/releases_subscription.rb | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/app/models/activity_pub/releases_subscription.rb b/app/models/activity_pub/releases_subscription.rb index 6bc53565f6cc2d..9a381d248e4273 100644 --- a/app/models/activity_pub/releases_subscription.rb +++ b/app/models/activity_pub/releases_subscription.rb @@ -25,7 +25,8 @@ class ReleasesSubscription < ApplicationRecord class << self def find_by_project_and_subscriber(project_id, subscriber_url) - find_by('project_id = :project_id AND LOWER(subscriber_url) = :subscriber_url', project_id: project_id, subscriber_url: subscriber_url.downcase) + find_by('project_id = :project_id AND LOWER(subscriber_url) = :subscriber_url', + project_id: project_id, subscriber_url: subscriber_url.downcase) end def touch_shared_inbox(project_id, shared_inbox_url) -- GitLab From e2af29ba768bae3cf69b751d7fc7a7a003215ec1 Mon Sep 17 00:00:00 2001 From: Patrick Cyiza Date: Tue, 6 Feb 2024 18:01:14 +0000 Subject: [PATCH 5/9] REFACTOR Patrick's review --- .../projects/send_release_activities_service.rb | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/app/services/activity_pub/projects/send_release_activities_service.rb b/app/services/activity_pub/projects/send_release_activities_service.rb index 0ff1f7cc3b0d19..406f6fcb841bb0 100644 --- a/app/services/activity_pub/projects/send_release_activities_service.rb +++ b/app/services/activity_pub/projects/send_release_activities_service.rb @@ -17,19 +17,22 @@ def initialize(release, activity) def execute subscriptions = ReleasesSubscription.pending_notification(release) + shared_inboxes = subscriptions.with_shared_inbox.group_by_shared_inbox.with_limit(MAX_REQUEST_PER_RUN) if shared_inboxes.count.present? notify_shared_inboxes(shared_inboxes) - else - individuals = subscriptions.without_shared_inbox.with_limit(MAX_REQUEST_PER_RUN) - if individuals.count == 0 - @has_more = false - return - end + return + end + + individual_inboxes = subscriptions.without_shared_inbox.with_limit(MAX_REQUEST_PER_RUN) - notify_individuals(individuals) + if individual_inboxes.empty? + @has_more = false + return end + + notify_individuals(individual_inboxes) end def has_more? -- GitLab From ea02294ebd3b7bdd0bd89ef8ef01ed483eb71ecc Mon Sep 17 00:00:00 2001 From: Olivier El Mekki Date: Tue, 6 Feb 2024 19:33:07 +0100 Subject: [PATCH 6/9] FIX rubocop --- .../activity_pub/projects/send_release_activities_service.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/app/services/activity_pub/projects/send_release_activities_service.rb b/app/services/activity_pub/projects/send_release_activities_service.rb index 406f6fcb841bb0..c8e7a7365b2ec5 100644 --- a/app/services/activity_pub/projects/send_release_activities_service.rb +++ b/app/services/activity_pub/projects/send_release_activities_service.rb @@ -24,7 +24,7 @@ def execute notify_shared_inboxes(shared_inboxes) return end - + individual_inboxes = subscriptions.without_shared_inbox.with_limit(MAX_REQUEST_PER_RUN) if individual_inboxes.empty? -- GitLab From 7926c87670fe99a379d4ec780a82d3600ffb805c Mon Sep 17 00:00:00 2001 From: Olivier El Mekki Date: Sat, 10 Feb 2024 17:42:23 +0100 Subject: [PATCH 7/9] REFACTOR implement Omar's suggestions --- app/models/activity_pub/releases_subscription.rb | 13 ++++--------- .../projects/send_release_activities_service.rb | 12 +++++------- .../activity_pub/releases_subscription_spec.rb | 11 ----------- 3 files changed, 9 insertions(+), 27 deletions(-) diff --git a/app/models/activity_pub/releases_subscription.rb b/app/models/activity_pub/releases_subscription.rb index 9a381d248e4273..8a7ff94e587f89 100644 --- a/app/models/activity_pub/releases_subscription.rb +++ b/app/models/activity_pub/releases_subscription.rb @@ -13,6 +13,7 @@ class ReleasesSubscription < ApplicationRecord end scope :with_limit, ->(maximum) { limit(maximum) } scope :group_by_shared_inbox, -> { select(:shared_inbox_url).group(:shared_inbox_url) } + scope :for_project_shared_inbox_url, ->(project_id, url) { where(project_id: project_id, shared_inbox_url: url) } attribute :payload, Gitlab::Database::Type::JsonPgSafe.new @@ -23,15 +24,9 @@ class ReleasesSubscription < ApplicationRecord public_url: { allow_nil: true } validates :shared_inbox_url, public_url: { allow_nil: true } - class << self - def find_by_project_and_subscriber(project_id, subscriber_url) - find_by('project_id = :project_id AND LOWER(subscriber_url) = :subscriber_url', - project_id: project_id, subscriber_url: subscriber_url.downcase) - end - - def touch_shared_inbox(project_id, shared_inbox_url) - where(project_id: project_id, shared_inbox_url: shared_inbox_url).touch_all - end + def self.find_by_project_and_subscriber(project_id, subscriber_url) + find_by('project_id = :project_id AND LOWER(subscriber_url) = :subscriber_url', + project_id: project_id, subscriber_url: subscriber_url.downcase) end end end diff --git a/app/services/activity_pub/projects/send_release_activities_service.rb b/app/services/activity_pub/projects/send_release_activities_service.rb index c8e7a7365b2ec5..b4454e995e4c68 100644 --- a/app/services/activity_pub/projects/send_release_activities_service.rb +++ b/app/services/activity_pub/projects/send_release_activities_service.rb @@ -48,19 +48,17 @@ def notify_shared_inboxes(subscriptions) rescue ActivityPub::ThirdPartyError end - ReleasesSubscription.touch_shared_inbox(release.project_id, subscription.shared_inbox_url) + ReleasesSubscription.for_project_shared_inbox_url(release.project_id, subscription.shared_inbox_url).touch_all end end def notify_individuals(subscriptions) subscriptions.each do |subscription| - begin - upload_activity(activity, subscription.subscriber_inbox_url) - rescue ActivityPub::ThirdPartyError - end - - subscription.touch + upload_activity(activity, subscription.subscriber_inbox_url) + rescue ActivityPub::ThirdPartyError end + + subscriptions.touch_all end end end diff --git a/spec/models/activity_pub/releases_subscription_spec.rb b/spec/models/activity_pub/releases_subscription_spec.rb index 7e0bf56a96ec9e..0633f293971b67 100644 --- a/spec/models/activity_pub/releases_subscription_spec.rb +++ b/spec/models/activity_pub/releases_subscription_spec.rb @@ -90,15 +90,4 @@ expect(result).to be(nil) end end - - describe '.touch_shared_inbox' do - let_it_be(:subscription) { create(:activity_pub_releases_subscription, :shared_inbox, updated_at: 1.month.ago) } - - it 'updates matching records' do - described_class.touch_shared_inbox(subscription.project_id, subscription.shared_inbox_url) - subscription.reload - - expect(subscription.updated_at).to be_within(1.hour).of(Time.current) - end - end end -- GitLab From 86a9dbfb60400e0fb5a2d5dfd1c5cd651fb960ce Mon Sep 17 00:00:00 2001 From: Olivier El Mekki Date: Sat, 10 Feb 2024 21:03:34 +0100 Subject: [PATCH 8/9] REFACTOR #has_more -> #has_pending_subscriptions --- .../projects/send_release_activities_service.rb | 8 ++++---- .../projects/publish_release_activities_worker.rb | 2 +- .../projects/send_release_activities_service_spec.rb | 6 +++--- .../projects/publish_release_activities_worker_spec.rb | 3 ++- 4 files changed, 10 insertions(+), 9 deletions(-) diff --git a/app/services/activity_pub/projects/send_release_activities_service.rb b/app/services/activity_pub/projects/send_release_activities_service.rb index b4454e995e4c68..59de54f01494fa 100644 --- a/app/services/activity_pub/projects/send_release_activities_service.rb +++ b/app/services/activity_pub/projects/send_release_activities_service.rb @@ -12,7 +12,7 @@ class SendReleaseActivitiesService def initialize(release, activity) @release = release @activity = activity - @has_more = true + @has_pending_subscriptions = true end def execute @@ -28,15 +28,15 @@ def execute individual_inboxes = subscriptions.without_shared_inbox.with_limit(MAX_REQUEST_PER_RUN) if individual_inboxes.empty? - @has_more = false + @has_pending_subscriptions = false return end notify_individuals(individual_inboxes) end - def has_more? - @has_more + def has_pending_subscriptions? + @has_pending_subscriptions end private diff --git a/app/workers/activity_pub/projects/publish_release_activities_worker.rb b/app/workers/activity_pub/projects/publish_release_activities_worker.rb index 0d31d86563483c..3643947d724f95 100644 --- a/app/workers/activity_pub/projects/publish_release_activities_worker.rb +++ b/app/workers/activity_pub/projects/publish_release_activities_worker.rb @@ -18,7 +18,7 @@ def handle_event(event) service = SendReleaseActivitiesService.new(release, activity_for(release)) service.execute - return unless service.has_more? + return unless service.has_pending_subscriptions? PublishReleaseActivitiesWorker.perform_async(::Projects::ReleasePublishedEvent, event.data) end diff --git a/spec/services/activity_pub/projects/send_release_activities_service_spec.rb b/spec/services/activity_pub/projects/send_release_activities_service_spec.rb index 7e9fe1e10b3ea5..73d8c9d681953f 100644 --- a/spec/services/activity_pub/projects/send_release_activities_service_spec.rb +++ b/spec/services/activity_pub/projects/send_release_activities_service_spec.rb @@ -63,7 +63,7 @@ end it 'says there may be more subscriptions to proceed' do - expect(service.has_more?).to be_truthy + expect(service.has_pending_subscriptions?).to be_truthy end end @@ -82,7 +82,7 @@ end it 'says there may be more subscriptions to proceed' do - expect(service.has_more?).to be_truthy + expect(service.has_pending_subscriptions?).to be_truthy end end @@ -97,7 +97,7 @@ end it 'says there is no more subscriptions to proceed' do - expect(service.has_more?).to be_falsey + expect(service.has_pending_subscriptions?).to be_falsey end end end diff --git a/spec/workers/activity_pub/projects/publish_release_activities_worker_spec.rb b/spec/workers/activity_pub/projects/publish_release_activities_worker_spec.rb index 23feae1f445b33..208ec4995d1d0e 100644 --- a/spec/workers/activity_pub/projects/publish_release_activities_worker_spec.rb +++ b/spec/workers/activity_pub/projects/publish_release_activities_worker_spec.rb @@ -12,7 +12,8 @@ end let(:service) do - instance_double(ActivityPub::Projects::SendReleaseActivitiesService, has_more?: has_more, execute: true) + instance_double(ActivityPub::Projects::SendReleaseActivitiesService, has_pending_subscriptions?: has_more, + execute: true) end let(:serializer) { instance_double(ActivityPub::PublishReleaseActivitySerializer, represent: payload) } -- GitLab From 8677b04c4e989fb2d0b4a38e686491eabbe048a1 Mon Sep 17 00:00:00 2001 From: Olivier El Mekki Date: Mon, 12 Feb 2024 19:07:08 +0100 Subject: [PATCH 9/9] FIX touch individual shareboxes individually --- .../projects/send_release_activities_service.rb | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/app/services/activity_pub/projects/send_release_activities_service.rb b/app/services/activity_pub/projects/send_release_activities_service.rb index 59de54f01494fa..47ed51cc5d33ae 100644 --- a/app/services/activity_pub/projects/send_release_activities_service.rb +++ b/app/services/activity_pub/projects/send_release_activities_service.rb @@ -54,11 +54,13 @@ def notify_shared_inboxes(subscriptions) def notify_individuals(subscriptions) subscriptions.each do |subscription| - upload_activity(activity, subscription.subscriber_inbox_url) - rescue ActivityPub::ThirdPartyError - end + begin + upload_activity(activity, subscription.subscriber_inbox_url) + rescue ActivityPub::ThirdPartyError + end - subscriptions.touch_all + subscription.touch + end end end end -- GitLab