From 95b6b2ac994dbb836994168098ddbe5d58b4077b Mon Sep 17 00:00:00 2001 From: Olivier El Mekki Date: Wed, 8 Nov 2023 15:23:59 +0100 Subject: [PATCH] ADD sending release activities This implements sending ActivityPub activities when a new release is published. We subscribe to the EventStore's event `Projects::ReleasePublishedEvent`, and queue the work to send out up to 100 activities per run, queuing an other run of the job until there is nothing more to process. To decide if a subscription has been processed, we checked if its `updated_at` date is older than the release `released_at` date. Then, when the subscription has been processed, we touch it. The subscriptions which have a shared inbox are performed first (which should be most of them), we send the one activity to the server's shared inbox, then touch all subscriptions from that server to mark them as processed. When we're done with shared inbox, we process individual inboxes. If a server is unresponsive, we ignore it this time, so that we don't have to block the queue until it responds properly. --- .../activity_pub/releases_subscription.rb | 18 ++- .../activity_pub/accept_follow_service.rb | 22 +--- .../send_release_activities_service.rb | 64 +++++++++++ app/services/concerns/activity_pub_request.rb | 21 ++++ .../publish_release_activities_worker.rb | 41 +++++++ app/workers/all_queues.yml | 9 ++ lib/gitlab/event_store.rb | 1 + .../activity_pub/releases_subscriptions.rb | 4 +- .../releases_subscription_spec.rb | 11 ++ .../send_release_activities_service_spec.rb | 104 ++++++++++++++++++ .../publish_release_activities_worker_spec.rb | 91 +++++++++++++++ 11 files changed, 363 insertions(+), 23 deletions(-) create mode 100644 app/services/activity_pub/projects/send_release_activities_service.rb create mode 100644 app/services/concerns/activity_pub_request.rb create mode 100644 app/workers/activity_pub/projects/publish_release_activities_worker.rb create mode 100644 spec/services/activity_pub/projects/send_release_activities_service_spec.rb create mode 100644 spec/workers/activity_pub/projects/publish_release_activities_worker_spec.rb diff --git a/app/models/activity_pub/releases_subscription.rb b/app/models/activity_pub/releases_subscription.rb index 0a4293b2bdea71..5e7d34f9c32fef 100644 --- a/app/models/activity_pub/releases_subscription.rb +++ b/app/models/activity_pub/releases_subscription.rb @@ -6,6 +6,14 @@ class ReleasesSubscription < ApplicationRecord enum :status, [:requested, :accepted], default: :requested + scope :with_shared_inbox, -> { where.not(shared_inbox_url: nil) } + scope :without_shared_inbox, -> { where(shared_inbox_url: nil) } + scope :pending_notification, ->(release) do + where(project_id: release.project_id).where("updated_at < ?", release.released_at) + end + scope :with_limit, ->(maximum) { limit(maximum) } + scope :group_by_shared_inbox, -> { select(:shared_inbox_url).group(:shared_inbox_url) } + attribute :payload, Gitlab::Database::Type::JsonPgSafe.new validates :payload, json_schema: { filename: 'activity_pub_follow_payload' }, allow_blank: true @@ -15,8 +23,14 @@ class ReleasesSubscription < ApplicationRecord public_url: { allow_nil: true } validates :shared_inbox_url, public_url: { allow_nil: true } - def self.find_by_project_and_subscriber(project_id, subscriber_url) - find_by('project_id = ? AND LOWER(subscriber_url) = ?', project_id, subscriber_url.downcase) + class << self + def find_by_project_and_subscriber(project_id, subscriber_url) + find_by('project_id = ? AND LOWER(subscriber_url) = ?', project_id, subscriber_url.downcase) + end + + def touch_shared_inbox(project_id, shared_inbox_url) + where(project_id: project_id, shared_inbox_url: shared_inbox_url).touch_all + end end end end diff --git a/app/services/activity_pub/accept_follow_service.rb b/app/services/activity_pub/accept_follow_service.rb index 0ec440fa972668..6f9b1f3a1d3f2c 100644 --- a/app/services/activity_pub/accept_follow_service.rb +++ b/app/services/activity_pub/accept_follow_service.rb @@ -2,6 +2,8 @@ module ActivityPub class AcceptFollowService + include ActivityPubRequest + MissingInboxURLError = Class.new(StandardError) attr_reader :subscription, :actor @@ -15,22 +17,12 @@ def execute return if subscription.accepted? raise MissingInboxURLError unless subscription.subscriber_inbox_url.present? - upload_accept_activity + upload_activity(payload, subscription.subscriber_inbox_url) subscription.accepted! end private - def upload_accept_activity - body = Gitlab::Json::LimitedEncoder.encode(payload, limit: 1.megabyte) - - begin - Gitlab::HTTP.post(subscription.subscriber_inbox_url, body: body, headers: headers) - rescue StandardError => e - raise ThirdPartyError, e.message - end - end - def payload follow = subscription.payload.dup follow.delete('@context') @@ -43,13 +35,5 @@ def payload object: follow } end - - def headers - { - 'User-Agent' => "GitLab/#{Gitlab::VERSION}", - 'Content-Type' => 'application/ld+json; profile="https://www.w3.org/ns/activitystreams"', - 'Accept' => 'application/ld+json; profile="https://www.w3.org/ns/activitystreams"' - } - end end end diff --git a/app/services/activity_pub/projects/send_release_activities_service.rb b/app/services/activity_pub/projects/send_release_activities_service.rb new file mode 100644 index 00000000000000..b58e7f4cad0032 --- /dev/null +++ b/app/services/activity_pub/projects/send_release_activities_service.rb @@ -0,0 +1,64 @@ +# frozen_string_literal: true + +module ActivityPub + module Projects + class SendReleaseActivitiesService + include ActivityPubRequest + + MAX_REQUEST_PER_RUN = 100 + + attr_reader :release, :activity + + def initialize(release, activity) + @release = release + @activity = activity + @has_more = true + end + + def execute + subscriptions = ReleasesSubscription.pending_notification(release) + with_shared_inbox = subscriptions.with_shared_inbox.group_by_shared_inbox.with_limit(MAX_REQUEST_PER_RUN) + + if with_shared_inbox.count.present? + notify_shared_inboxes(with_shared_inbox) + else + individuals = subscriptions.without_shared_inbox.with_limit(MAX_REQUEST_PER_RUN) + if individuals.count == 0 + @has_more = false + return + end + + notify_individuals(individuals) + end + end + + def has_more? + @has_more + end + + private + + def notify_shared_inboxes(subscriptions) + subscriptions.each do |subscription| + begin + upload_activity(activity, subscription.shared_inbox_url) + rescue ActivityPub::ThirdPartyError + end + + ReleasesSubscription.touch_shared_inbox(release.project_id, subscription.shared_inbox_url) + end + end + + def notify_individuals(subscriptions) + subscriptions.each do |subscription| + begin + upload_activity(activity, subscription.subscriber_inbox_url) + rescue ActivityPub::ThirdPartyError + end + + subscription.touch + end + end + end + end +end diff --git a/app/services/concerns/activity_pub_request.rb b/app/services/concerns/activity_pub_request.rb new file mode 100644 index 00000000000000..8e7f6c5e6688ea --- /dev/null +++ b/app/services/concerns/activity_pub_request.rb @@ -0,0 +1,21 @@ +# frozen_string_literal: true + +module ActivityPubRequest + def upload_activity(payload, inbox_url) + body = Gitlab::Json::LimitedEncoder.encode(payload, limit: 1.megabyte) + + begin + Gitlab::HTTP.post(inbox_url, body: body, headers: activity_pub_headers) + rescue StandardError => e + raise ActivityPub::ThirdPartyError, e.message + end + end + + def activity_pub_headers + { + 'User-Agent' => "GitLab/#{Gitlab::VERSION}", + 'Content-Type' => 'application/ld+json; profile="https://www.w3.org/ns/activitystreams"', + 'Accept' => 'application/ld+json; profile="https://www.w3.org/ns/activitystreams"' + } + end +end diff --git a/app/workers/activity_pub/projects/publish_release_activities_worker.rb b/app/workers/activity_pub/projects/publish_release_activities_worker.rb new file mode 100644 index 00000000000000..0d31d86563483c --- /dev/null +++ b/app/workers/activity_pub/projects/publish_release_activities_worker.rb @@ -0,0 +1,41 @@ +# frozen_string_literal: true + +module ActivityPub + module Projects + class PublishReleaseActivitiesWorker + include Gitlab::EventStore::Subscriber + + idempotent! + worker_has_external_dependencies! + feature_category :release_orchestration + data_consistency :delayed + queue_namespace :activity_pub + + def handle_event(event) + release = Release.find_by_id(event.data[:release_id]) + return unless release + + service = SendReleaseActivitiesService.new(release, activity_for(release)) + service.execute + + return unless service.has_more? + + PublishReleaseActivitiesWorker.perform_async(::Projects::ReleasePublishedEvent, event.data) + end + + private + + def activity_for(release) + # Not being able to use serializer here is a serious problem, given the + # activity we send here is the exact same data generated for + # ActivityPub::Projects::ReleasesController#outbox, and the same thing + # will happen for all actors (they provide the same data both in their + # `#outbox` endpoint and when sending out activities from workers). + # + # Disabling the rule for now, we need to discuss that. + serializer = ActivityPub::PublishReleaseActivitySerializer.new # rubocop:disable CodeReuse/Serializer -- see above. + serializer.represent(release) + end + end + end +end diff --git a/app/workers/all_queues.yml b/app/workers/all_queues.yml index dfad9f7f6730e2..fee4975c2799fe 100644 --- a/app/workers/all_queues.yml +++ b/app/workers/all_queues.yml @@ -3,6 +3,15 @@ # # Do not edit it manually! --- +- :name: activity_pub:activity_pub_projects_publish_release_activities + :worker_name: ActivityPub::Projects::PublishReleaseActivitiesWorker + :feature_category: :release_orchestration + :has_external_dependencies: true + :urgency: :low + :resource_boundary: :unknown + :weight: 1 + :idempotent: true + :tags: [] - :name: activity_pub:activity_pub_projects_releases_subscription :worker_name: ActivityPub::Projects::ReleasesSubscriptionWorker :feature_category: :release_orchestration diff --git a/lib/gitlab/event_store.rb b/lib/gitlab/event_store.rb index b422fd061ff93a..36da12f79976bb 100644 --- a/lib/gitlab/event_store.rb +++ b/lib/gitlab/event_store.rb @@ -51,6 +51,7 @@ def self.configure!(store) to: ::Packages::PackageCreatedEvent, if: -> (event) { ::Ml::ExperimentTracking::AssociateMlCandidateToPackageWorker.handles_event?(event) } store.subscribe ::Ci::InitializePipelinesIidSequenceWorker, to: ::Projects::ProjectCreatedEvent + store.subscribe ::ActivityPub::Projects::PublishReleaseActivitiesWorker, to: ::Projects::ReleasePublishedEvent end private_class_method :configure! end diff --git a/spec/factories/activity_pub/releases_subscriptions.rb b/spec/factories/activity_pub/releases_subscriptions.rb index b789188528a671..abfa89ea1bacaa 100644 --- a/spec/factories/activity_pub/releases_subscriptions.rb +++ b/spec/factories/activity_pub/releases_subscriptions.rb @@ -3,7 +3,7 @@ FactoryBot.define do factory :activity_pub_releases_subscription, class: 'ActivityPub::ReleasesSubscription' do project - subscriber_url { 'https://example.com/actor' } + subscriber_url { |i| "https://example.com/actor-#{i}" } status { :requested } payload do { @@ -16,7 +16,7 @@ end trait :inbox do - subscriber_inbox_url { 'https://example.com/actor/inbox' } + subscriber_inbox_url { |i| "https://example.com/actor/inbox-#{i}" } end trait :shared_inbox do diff --git a/spec/models/activity_pub/releases_subscription_spec.rb b/spec/models/activity_pub/releases_subscription_spec.rb index 0633f293971b67..7e0bf56a96ec9e 100644 --- a/spec/models/activity_pub/releases_subscription_spec.rb +++ b/spec/models/activity_pub/releases_subscription_spec.rb @@ -90,4 +90,15 @@ expect(result).to be(nil) end end + + describe '.touch_shared_inbox' do + let_it_be(:subscription) { create(:activity_pub_releases_subscription, :shared_inbox, updated_at: 1.month.ago) } + + it 'updates matching records' do + described_class.touch_shared_inbox(subscription.project_id, subscription.shared_inbox_url) + subscription.reload + + expect(subscription.updated_at).to be_within(1.hour).of(Time.current) + end + end end diff --git a/spec/services/activity_pub/projects/send_release_activities_service_spec.rb b/spec/services/activity_pub/projects/send_release_activities_service_spec.rb new file mode 100644 index 00000000000000..7e9fe1e10b3ea5 --- /dev/null +++ b/spec/services/activity_pub/projects/send_release_activities_service_spec.rb @@ -0,0 +1,104 @@ +# frozen_string_literal: true + +require 'spec_helper' + +RSpec.describe ActivityPub::Projects::SendReleaseActivitiesService, feature_category: :release_orchestration do + let_it_be(:release) { create(:release, released_at: Time.current) } + let_it_be(:shared_inbox) { 'http://example.com/shared-inbox' } + let_it_be(:individual_inbox) { 'http://example.com/actor/inbox' } + + let_it_be_with_reload(:existing_subscription_with_shared_inbox) do + create_list(:activity_pub_releases_subscription, 2, { + shared_inbox_url: shared_inbox, + project: release.project, + created_at: 1.month.ago, + updated_at: 1.month.ago + }) + end + + let_it_be_with_reload(:existing_individual_subscription) do + create(:activity_pub_releases_subscription, { + subscriber_inbox_url: individual_inbox, + project: release.project, + created_at: 1.month.ago, + updated_at: 1.month.ago + }) + end + + let(:activity) do + { + id: 'http://example.com/activity', + '@context': 'https://www.w3.org/ns/activitystreams', + type: 'Create', + actor: 'http://example.com/actor', + object: { + id: 'http://example.com/release', + type: 'Application' + } + } + end + + let(:service) { described_class.new(release, activity) } + + before do + allow(service).to receive(:upload_activity).and_return(true) + end + + describe '#execute' do + describe 'when there are pending subscriptions with shared inbox' do + before do + service.execute + end + + it 'sends activities to the shared inbox' do + expect(service).to have_received(:upload_activity).with(activity, shared_inbox) + end + + it 'updates subscription with shared inbox' do + expect(ActivityPub::ReleasesSubscription.with_shared_inbox.pending_notification(release)).to be_empty + end + + it 'does not update subscription with individual inbox' do + expect(ActivityPub::ReleasesSubscription.without_shared_inbox.pending_notification(release).count).to eq 1 + end + + it 'says there may be more subscriptions to proceed' do + expect(service.has_more?).to be_truthy + end + end + + describe 'when there are pending subscriptions without shared inbox' do + before do + ActivityPub::ReleasesSubscription.with_shared_inbox.touch_all + service.execute + end + + it 'sends activities to the individual inboxes' do + expect(service).to have_received(:upload_activity).with(activity, individual_inbox) + end + + it 'updates subscription with individual inbox' do + expect(ActivityPub::ReleasesSubscription.without_shared_inbox.pending_notification(release).count).to eq 0 + end + + it 'says there may be more subscriptions to proceed' do + expect(service.has_more?).to be_truthy + end + end + + describe 'when there is not more pending subscription' do + before do + ActivityPub::ReleasesSubscription.touch_all + service.execute + end + + it 'does not send any activity' do + expect(service).not_to have_received(:upload_activity) + end + + it 'says there is no more subscriptions to proceed' do + expect(service.has_more?).to be_falsey + end + end + end +end diff --git a/spec/workers/activity_pub/projects/publish_release_activities_worker_spec.rb b/spec/workers/activity_pub/projects/publish_release_activities_worker_spec.rb new file mode 100644 index 00000000000000..23feae1f445b33 --- /dev/null +++ b/spec/workers/activity_pub/projects/publish_release_activities_worker_spec.rb @@ -0,0 +1,91 @@ +# frozen_string_literal: true + +require 'spec_helper' + +RSpec.describe ActivityPub::Projects::PublishReleaseActivitiesWorker, feature_category: :release_orchestration do + let(:worker) { described_class.new } + + let(:release) { build_stubbed(:release) } + + let(:event) do + ::Projects::ReleasePublishedEvent.new(data: HashWithIndifferentAccess.new(release_id: release.id)) + end + + let(:service) do + instance_double(ActivityPub::Projects::SendReleaseActivitiesService, has_more?: has_more, execute: true) + end + + let(:serializer) { instance_double(ActivityPub::PublishReleaseActivitySerializer, represent: payload) } + let(:has_more) { true } + + let(:payload) do + { + '@context': 'https://www.w3.org/ns/activitystreams', + id: 'http://example.com/activity', + type: 'Create', + actor: 'http://example.com/actor', + object: { + id: 'http://example.com/release', + type: 'Application' + } + } + end + + before do + allow(ActivityPub::Projects::SendReleaseActivitiesService).to receive(:new) { service } + allow(ActivityPub::PublishReleaseActivitySerializer).to receive(:new) { serializer } + allow(described_class).to receive(:perform_async) + end + + shared_examples_for 'successful run' do + it 'serializes the activity' do + expect(serializer).to have_received(:represent) + end + + it 'calls the activity sending service' do + expect(ActivityPub::Projects::SendReleaseActivitiesService).to have_received(:new).with(release, payload) + expect(service).to have_received(:execute) + end + end + + describe '#handle_event' do + describe 'when the release exists' do + before do + allow(Release).to receive(:find_by_id) { release } + worker.handle_event(event) + end + + describe 'when the job sent the last activities' do + let(:has_more) { false } + + it_behaves_like 'successful run' + + it 'does not queue an other run' do + expect(described_class).not_to have_received(:perform_async) + end + end + + describe 'when there is still more work to do' do + it_behaves_like 'successful run' + + it 'queues an other run' do + expect(described_class).to have_received(:perform_async) + end + end + end + + describe 'when the release does not exist' do + before do + worker.handle_event(event) + end + + it 'does not call the activity sending service' do + expect(service).not_to have_received(:execute) + end + + it 'does not queue an other run' do + expect(described_class).not_to have_received(:perform_async) + end + end + end +end -- GitLab