diff --git a/app/models/activity_pub/releases_subscription.rb b/app/models/activity_pub/releases_subscription.rb index 0a4293b2bdea71872e26ad9497e3f3fc50fbd4d7..5e7d34f9c32fefdb80bf0d603444e26f8087ba53 100644 --- a/app/models/activity_pub/releases_subscription.rb +++ b/app/models/activity_pub/releases_subscription.rb @@ -6,6 +6,14 @@ class ReleasesSubscription < ApplicationRecord enum :status, [:requested, :accepted], default: :requested + scope :with_shared_inbox, -> { where.not(shared_inbox_url: nil) } + scope :without_shared_inbox, -> { where(shared_inbox_url: nil) } + scope :pending_notification, ->(release) do + where(project_id: release.project_id).where("updated_at < ?", release.released_at) + end + scope :with_limit, ->(maximum) { limit(maximum) } + scope :group_by_shared_inbox, -> { select(:shared_inbox_url).group(:shared_inbox_url) } + attribute :payload, Gitlab::Database::Type::JsonPgSafe.new validates :payload, json_schema: { filename: 'activity_pub_follow_payload' }, allow_blank: true @@ -15,8 +23,14 @@ class ReleasesSubscription < ApplicationRecord public_url: { allow_nil: true } validates :shared_inbox_url, public_url: { allow_nil: true } - def self.find_by_project_and_subscriber(project_id, subscriber_url) - find_by('project_id = ? AND LOWER(subscriber_url) = ?', project_id, subscriber_url.downcase) + class << self + def find_by_project_and_subscriber(project_id, subscriber_url) + find_by('project_id = ? AND LOWER(subscriber_url) = ?', project_id, subscriber_url.downcase) + end + + def touch_shared_inbox(project_id, shared_inbox_url) + where(project_id: project_id, shared_inbox_url: shared_inbox_url).touch_all + end end end end diff --git a/app/services/activity_pub/accept_follow_service.rb b/app/services/activity_pub/accept_follow_service.rb index 0ec440fa97266858efa3b32977bb5c2956f355fb..6f9b1f3a1d3f2c7eeac7465991571e11a9e5a1a9 100644 --- a/app/services/activity_pub/accept_follow_service.rb +++ b/app/services/activity_pub/accept_follow_service.rb @@ -2,6 +2,8 @@ module ActivityPub class AcceptFollowService + include ActivityPubRequest + MissingInboxURLError = Class.new(StandardError) attr_reader :subscription, :actor @@ -15,22 +17,12 @@ def execute return if subscription.accepted? raise MissingInboxURLError unless subscription.subscriber_inbox_url.present? - upload_accept_activity + upload_activity(payload, subscription.subscriber_inbox_url) subscription.accepted! end private - def upload_accept_activity - body = Gitlab::Json::LimitedEncoder.encode(payload, limit: 1.megabyte) - - begin - Gitlab::HTTP.post(subscription.subscriber_inbox_url, body: body, headers: headers) - rescue StandardError => e - raise ThirdPartyError, e.message - end - end - def payload follow = subscription.payload.dup follow.delete('@context') @@ -43,13 +35,5 @@ def payload object: follow } end - - def headers - { - 'User-Agent' => "GitLab/#{Gitlab::VERSION}", - 'Content-Type' => 'application/ld+json; profile="https://www.w3.org/ns/activitystreams"', - 'Accept' => 'application/ld+json; profile="https://www.w3.org/ns/activitystreams"' - } - end end end diff --git a/app/services/activity_pub/projects/send_release_activities_service.rb b/app/services/activity_pub/projects/send_release_activities_service.rb new file mode 100644 index 0000000000000000000000000000000000000000..b58e7f4cad0032ae2f431b16d6d2d2dd0096d27c --- /dev/null +++ b/app/services/activity_pub/projects/send_release_activities_service.rb @@ -0,0 +1,64 @@ +# frozen_string_literal: true + +module ActivityPub + module Projects + class SendReleaseActivitiesService + include ActivityPubRequest + + MAX_REQUEST_PER_RUN = 100 + + attr_reader :release, :activity + + def initialize(release, activity) + @release = release + @activity = activity + @has_more = true + end + + def execute + subscriptions = ReleasesSubscription.pending_notification(release) + with_shared_inbox = subscriptions.with_shared_inbox.group_by_shared_inbox.with_limit(MAX_REQUEST_PER_RUN) + + if with_shared_inbox.count.present? + notify_shared_inboxes(with_shared_inbox) + else + individuals = subscriptions.without_shared_inbox.with_limit(MAX_REQUEST_PER_RUN) + if individuals.count == 0 + @has_more = false + return + end + + notify_individuals(individuals) + end + end + + def has_more? + @has_more + end + + private + + def notify_shared_inboxes(subscriptions) + subscriptions.each do |subscription| + begin + upload_activity(activity, subscription.shared_inbox_url) + rescue ActivityPub::ThirdPartyError + end + + ReleasesSubscription.touch_shared_inbox(release.project_id, subscription.shared_inbox_url) + end + end + + def notify_individuals(subscriptions) + subscriptions.each do |subscription| + begin + upload_activity(activity, subscription.subscriber_inbox_url) + rescue ActivityPub::ThirdPartyError + end + + subscription.touch + end + end + end + end +end diff --git a/app/services/concerns/activity_pub_request.rb b/app/services/concerns/activity_pub_request.rb new file mode 100644 index 0000000000000000000000000000000000000000..8e7f6c5e6688ea7b7e4d08e00e3cfd1982b1fa1d --- /dev/null +++ b/app/services/concerns/activity_pub_request.rb @@ -0,0 +1,21 @@ +# frozen_string_literal: true + +module ActivityPubRequest + def upload_activity(payload, inbox_url) + body = Gitlab::Json::LimitedEncoder.encode(payload, limit: 1.megabyte) + + begin + Gitlab::HTTP.post(inbox_url, body: body, headers: activity_pub_headers) + rescue StandardError => e + raise ActivityPub::ThirdPartyError, e.message + end + end + + def activity_pub_headers + { + 'User-Agent' => "GitLab/#{Gitlab::VERSION}", + 'Content-Type' => 'application/ld+json; profile="https://www.w3.org/ns/activitystreams"', + 'Accept' => 'application/ld+json; profile="https://www.w3.org/ns/activitystreams"' + } + end +end diff --git a/app/workers/activity_pub/projects/publish_release_activities_worker.rb b/app/workers/activity_pub/projects/publish_release_activities_worker.rb new file mode 100644 index 0000000000000000000000000000000000000000..0d31d86563483c1350ffffbd338d2093d7131e94 --- /dev/null +++ b/app/workers/activity_pub/projects/publish_release_activities_worker.rb @@ -0,0 +1,41 @@ +# frozen_string_literal: true + +module ActivityPub + module Projects + class PublishReleaseActivitiesWorker + include Gitlab::EventStore::Subscriber + + idempotent! + worker_has_external_dependencies! + feature_category :release_orchestration + data_consistency :delayed + queue_namespace :activity_pub + + def handle_event(event) + release = Release.find_by_id(event.data[:release_id]) + return unless release + + service = SendReleaseActivitiesService.new(release, activity_for(release)) + service.execute + + return unless service.has_more? + + PublishReleaseActivitiesWorker.perform_async(::Projects::ReleasePublishedEvent, event.data) + end + + private + + def activity_for(release) + # Not being able to use serializer here is a serious problem, given the + # activity we send here is the exact same data generated for + # ActivityPub::Projects::ReleasesController#outbox, and the same thing + # will happen for all actors (they provide the same data both in their + # `#outbox` endpoint and when sending out activities from workers). + # + # Disabling the rule for now, we need to discuss that. + serializer = ActivityPub::PublishReleaseActivitySerializer.new # rubocop:disable CodeReuse/Serializer -- see above. + serializer.represent(release) + end + end + end +end diff --git a/app/workers/all_queues.yml b/app/workers/all_queues.yml index dfad9f7f6730e25e912b8c270f5e185581e06654..fee4975c2799fe063c6b527d467a7a63b68a3c94 100644 --- a/app/workers/all_queues.yml +++ b/app/workers/all_queues.yml @@ -3,6 +3,15 @@ # # Do not edit it manually! --- +- :name: activity_pub:activity_pub_projects_publish_release_activities + :worker_name: ActivityPub::Projects::PublishReleaseActivitiesWorker + :feature_category: :release_orchestration + :has_external_dependencies: true + :urgency: :low + :resource_boundary: :unknown + :weight: 1 + :idempotent: true + :tags: [] - :name: activity_pub:activity_pub_projects_releases_subscription :worker_name: ActivityPub::Projects::ReleasesSubscriptionWorker :feature_category: :release_orchestration diff --git a/lib/gitlab/event_store.rb b/lib/gitlab/event_store.rb index b422fd061ff93a976c0b0b6db80153e5ab2ff147..36da12f79976bbebe0432ee9dc3ee60744d15765 100644 --- a/lib/gitlab/event_store.rb +++ b/lib/gitlab/event_store.rb @@ -51,6 +51,7 @@ def self.configure!(store) to: ::Packages::PackageCreatedEvent, if: -> (event) { ::Ml::ExperimentTracking::AssociateMlCandidateToPackageWorker.handles_event?(event) } store.subscribe ::Ci::InitializePipelinesIidSequenceWorker, to: ::Projects::ProjectCreatedEvent + store.subscribe ::ActivityPub::Projects::PublishReleaseActivitiesWorker, to: ::Projects::ReleasePublishedEvent end private_class_method :configure! end diff --git a/spec/factories/activity_pub/releases_subscriptions.rb b/spec/factories/activity_pub/releases_subscriptions.rb index b789188528a67116013646ec402e79f69e9f90e1..abfa89ea1bacaaf312c78a636081693def0e7f2b 100644 --- a/spec/factories/activity_pub/releases_subscriptions.rb +++ b/spec/factories/activity_pub/releases_subscriptions.rb @@ -3,7 +3,7 @@ FactoryBot.define do factory :activity_pub_releases_subscription, class: 'ActivityPub::ReleasesSubscription' do project - subscriber_url { 'https://example.com/actor' } + subscriber_url { |i| "https://example.com/actor-#{i}" } status { :requested } payload do { @@ -16,7 +16,7 @@ end trait :inbox do - subscriber_inbox_url { 'https://example.com/actor/inbox' } + subscriber_inbox_url { |i| "https://example.com/actor/inbox-#{i}" } end trait :shared_inbox do diff --git a/spec/models/activity_pub/releases_subscription_spec.rb b/spec/models/activity_pub/releases_subscription_spec.rb index 0633f293971b674a663c662e9700b91a22426877..7e0bf56a96ec9e776e1cccd2d740b234bd5b2e54 100644 --- a/spec/models/activity_pub/releases_subscription_spec.rb +++ b/spec/models/activity_pub/releases_subscription_spec.rb @@ -90,4 +90,15 @@ expect(result).to be(nil) end end + + describe '.touch_shared_inbox' do + let_it_be(:subscription) { create(:activity_pub_releases_subscription, :shared_inbox, updated_at: 1.month.ago) } + + it 'updates matching records' do + described_class.touch_shared_inbox(subscription.project_id, subscription.shared_inbox_url) + subscription.reload + + expect(subscription.updated_at).to be_within(1.hour).of(Time.current) + end + end end diff --git a/spec/services/activity_pub/projects/send_release_activities_service_spec.rb b/spec/services/activity_pub/projects/send_release_activities_service_spec.rb new file mode 100644 index 0000000000000000000000000000000000000000..7e9fe1e10b3ea5f8ebddc48cff1c8bc2ba225853 --- /dev/null +++ b/spec/services/activity_pub/projects/send_release_activities_service_spec.rb @@ -0,0 +1,104 @@ +# frozen_string_literal: true + +require 'spec_helper' + +RSpec.describe ActivityPub::Projects::SendReleaseActivitiesService, feature_category: :release_orchestration do + let_it_be(:release) { create(:release, released_at: Time.current) } + let_it_be(:shared_inbox) { 'http://example.com/shared-inbox' } + let_it_be(:individual_inbox) { 'http://example.com/actor/inbox' } + + let_it_be_with_reload(:existing_subscription_with_shared_inbox) do + create_list(:activity_pub_releases_subscription, 2, { + shared_inbox_url: shared_inbox, + project: release.project, + created_at: 1.month.ago, + updated_at: 1.month.ago + }) + end + + let_it_be_with_reload(:existing_individual_subscription) do + create(:activity_pub_releases_subscription, { + subscriber_inbox_url: individual_inbox, + project: release.project, + created_at: 1.month.ago, + updated_at: 1.month.ago + }) + end + + let(:activity) do + { + id: 'http://example.com/activity', + '@context': 'https://www.w3.org/ns/activitystreams', + type: 'Create', + actor: 'http://example.com/actor', + object: { + id: 'http://example.com/release', + type: 'Application' + } + } + end + + let(:service) { described_class.new(release, activity) } + + before do + allow(service).to receive(:upload_activity).and_return(true) + end + + describe '#execute' do + describe 'when there are pending subscriptions with shared inbox' do + before do + service.execute + end + + it 'sends activities to the shared inbox' do + expect(service).to have_received(:upload_activity).with(activity, shared_inbox) + end + + it 'updates subscription with shared inbox' do + expect(ActivityPub::ReleasesSubscription.with_shared_inbox.pending_notification(release)).to be_empty + end + + it 'does not update subscription with individual inbox' do + expect(ActivityPub::ReleasesSubscription.without_shared_inbox.pending_notification(release).count).to eq 1 + end + + it 'says there may be more subscriptions to proceed' do + expect(service.has_more?).to be_truthy + end + end + + describe 'when there are pending subscriptions without shared inbox' do + before do + ActivityPub::ReleasesSubscription.with_shared_inbox.touch_all + service.execute + end + + it 'sends activities to the individual inboxes' do + expect(service).to have_received(:upload_activity).with(activity, individual_inbox) + end + + it 'updates subscription with individual inbox' do + expect(ActivityPub::ReleasesSubscription.without_shared_inbox.pending_notification(release).count).to eq 0 + end + + it 'says there may be more subscriptions to proceed' do + expect(service.has_more?).to be_truthy + end + end + + describe 'when there is not more pending subscription' do + before do + ActivityPub::ReleasesSubscription.touch_all + service.execute + end + + it 'does not send any activity' do + expect(service).not_to have_received(:upload_activity) + end + + it 'says there is no more subscriptions to proceed' do + expect(service.has_more?).to be_falsey + end + end + end +end diff --git a/spec/workers/activity_pub/projects/publish_release_activities_worker_spec.rb b/spec/workers/activity_pub/projects/publish_release_activities_worker_spec.rb new file mode 100644 index 0000000000000000000000000000000000000000..23feae1f445b33a9a47e8e5d7f9521fe89d15cd1 --- /dev/null +++ b/spec/workers/activity_pub/projects/publish_release_activities_worker_spec.rb @@ -0,0 +1,91 @@ +# frozen_string_literal: true + +require 'spec_helper' + +RSpec.describe ActivityPub::Projects::PublishReleaseActivitiesWorker, feature_category: :release_orchestration do + let(:worker) { described_class.new } + + let(:release) { build_stubbed(:release) } + + let(:event) do + ::Projects::ReleasePublishedEvent.new(data: HashWithIndifferentAccess.new(release_id: release.id)) + end + + let(:service) do + instance_double(ActivityPub::Projects::SendReleaseActivitiesService, has_more?: has_more, execute: true) + end + + let(:serializer) { instance_double(ActivityPub::PublishReleaseActivitySerializer, represent: payload) } + let(:has_more) { true } + + let(:payload) do + { + '@context': 'https://www.w3.org/ns/activitystreams', + id: 'http://example.com/activity', + type: 'Create', + actor: 'http://example.com/actor', + object: { + id: 'http://example.com/release', + type: 'Application' + } + } + end + + before do + allow(ActivityPub::Projects::SendReleaseActivitiesService).to receive(:new) { service } + allow(ActivityPub::PublishReleaseActivitySerializer).to receive(:new) { serializer } + allow(described_class).to receive(:perform_async) + end + + shared_examples_for 'successful run' do + it 'serializes the activity' do + expect(serializer).to have_received(:represent) + end + + it 'calls the activity sending service' do + expect(ActivityPub::Projects::SendReleaseActivitiesService).to have_received(:new).with(release, payload) + expect(service).to have_received(:execute) + end + end + + describe '#handle_event' do + describe 'when the release exists' do + before do + allow(Release).to receive(:find_by_id) { release } + worker.handle_event(event) + end + + describe 'when the job sent the last activities' do + let(:has_more) { false } + + it_behaves_like 'successful run' + + it 'does not queue an other run' do + expect(described_class).not_to have_received(:perform_async) + end + end + + describe 'when there is still more work to do' do + it_behaves_like 'successful run' + + it 'queues an other run' do + expect(described_class).to have_received(:perform_async) + end + end + end + + describe 'when the release does not exist' do + before do + worker.handle_event(event) + end + + it 'does not call the activity sending service' do + expect(service).not_to have_received(:execute) + end + + it 'does not queue an other run' do + expect(described_class).not_to have_received(:perform_async) + end + end + end +end