From 17627c5a313b36694977b04e5fe878e47666b31f Mon Sep 17 00:00:00 2001 From: Fabio Pitino Date: Sat, 6 Jun 2020 20:45:18 +0100 Subject: [PATCH 1/8] Introduce Gitlab::EventStore pub-sub A simple pub-sub system that leverages Sidekiq workers as subscribers. Changelog: other --- app/events/ci/pipeline_created_event.rb | 14 + app/services/ci/create_pipeline_service.rb | 9 +- app/workers/all_queues.yml | 9 + .../update_head_pipeline_worker.rb | 23 ++ ..._head_pipeline_for_merge_request_worker.rb | 2 + .../ci_publish_pipeline_events.yml | 8 + config/sidekiq_queues.yml | 2 + doc/development/event_store.md | 248 ++++++++++++++++++ doc/development/index.md | 1 + lib/gitlab/event_store.rb | 41 +++ lib/gitlab/event_store/event.rb | 54 ++++ lib/gitlab/event_store/store.rb | 54 ++++ lib/gitlab/event_store/subscriber.rb | 36 +++ lib/gitlab/event_store/subscription.rb | 41 +++ spec/lib/gitlab/event_store/event_spec.rb | 58 ++++ spec/lib/gitlab/event_store/store_spec.rb | 179 +++++++++++++ .../ci/create_pipeline_service_spec.rb | 128 ++------- .../update_head_pipeline_worker_spec.rb | 138 ++++++++++ 18 files changed, 933 insertions(+), 112 deletions(-) create mode 100644 app/events/ci/pipeline_created_event.rb create mode 100644 app/workers/merge_requests/update_head_pipeline_worker.rb create mode 100644 config/feature_flags/development/ci_publish_pipeline_events.yml create mode 100644 doc/development/event_store.md create mode 100644 lib/gitlab/event_store.rb create mode 100644 lib/gitlab/event_store/event.rb create mode 100644 lib/gitlab/event_store/store.rb create mode 100644 lib/gitlab/event_store/subscriber.rb create mode 100644 lib/gitlab/event_store/subscription.rb create mode 100644 spec/lib/gitlab/event_store/event_spec.rb create mode 100644 spec/lib/gitlab/event_store/store_spec.rb create mode 100644 spec/workers/merge_requests/update_head_pipeline_worker_spec.rb diff --git a/app/events/ci/pipeline_created_event.rb b/app/events/ci/pipeline_created_event.rb new file mode 100644 index 00000000000000..8b971b63cea7b5 --- /dev/null +++ b/app/events/ci/pipeline_created_event.rb @@ -0,0 +1,14 @@ +# frozen_string_literal: true + +module Ci + class PipelineCreatedEvent < ::Gitlab::EventStore::Event + def schema + { + 'type' => 'object', + 'properties' => { + 'pipeline_id' => { 'type' => 'integer' } + } + } + end + end +end diff --git a/app/services/ci/create_pipeline_service.rb b/app/services/ci/create_pipeline_service.rb index c1f35afba40f6b..5338f047051c29 100644 --- a/app/services/ci/create_pipeline_service.rb +++ b/app/services/ci/create_pipeline_service.rb @@ -95,7 +95,14 @@ def execute(source, ignore_skip_ci: false, save_on_errors: true, trigger_request .build! if pipeline.persisted? - schedule_head_pipeline_update + if Feature.enabled?(:ci_publish_pipeline_events, pipeline.project, default_enabled: :yaml) + Gitlab::EventStore.publish( + Ci::PipelineCreatedEvent.new(data: { pipeline_id: pipeline.id }) + ) + else + schedule_head_pipeline_update + end + create_namespace_onboarding_action else # If pipeline is not persisted, try to recover IID diff --git a/app/workers/all_queues.yml b/app/workers/all_queues.yml index 21c8107b8e5021..0d4b92a50658bf 100644 --- a/app/workers/all_queues.yml +++ b/app/workers/all_queues.yml @@ -2420,6 +2420,15 @@ :weight: 1 :idempotent: true :tags: [] +- :name: merge_requests_update_head_pipeline + :worker_name: MergeRequests::UpdateHeadPipelineWorker + :feature_category: :code_review + :has_external_dependencies: + :urgency: :high + :resource_boundary: :cpu + :weight: 1 + :idempotent: true + :tags: [] - :name: metrics_dashboard_prune_old_annotations :worker_name: Metrics::Dashboard::PruneOldAnnotationsWorker :feature_category: :metrics diff --git a/app/workers/merge_requests/update_head_pipeline_worker.rb b/app/workers/merge_requests/update_head_pipeline_worker.rb new file mode 100644 index 00000000000000..c8dc9d1f7c8bb7 --- /dev/null +++ b/app/workers/merge_requests/update_head_pipeline_worker.rb @@ -0,0 +1,23 @@ +# frozen_string_literal: true + +module MergeRequests + class UpdateHeadPipelineWorker + include ApplicationWorker + include Gitlab::EventStore::Subscriber + + feature_category :code_review + urgency :high + worker_resource_boundary :cpu + data_consistency :always + + idempotent! + + def handle_event(event) + Ci::Pipeline.find_by_id(event.data[:pipeline_id]).try do |pipeline| + pipeline.all_merge_requests.opened.each do |merge_request| + UpdateHeadPipelineForMergeRequestWorker.perform_async(merge_request.id) + end + end + end + end +end diff --git a/app/workers/update_head_pipeline_for_merge_request_worker.rb b/app/workers/update_head_pipeline_for_merge_request_worker.rb index 61fe278e016a37..3a2447b2108320 100644 --- a/app/workers/update_head_pipeline_for_merge_request_worker.rb +++ b/app/workers/update_head_pipeline_for_merge_request_worker.rb @@ -8,8 +8,10 @@ class UpdateHeadPipelineForMergeRequestWorker sidekiq_options retry: 3 include PipelineQueue + # NOTE: this worker belongs to :code_review since there is no CI logic. queue_namespace :pipeline_processing feature_category :continuous_integration + urgency :high worker_resource_boundary :cpu diff --git a/config/feature_flags/development/ci_publish_pipeline_events.yml b/config/feature_flags/development/ci_publish_pipeline_events.yml new file mode 100644 index 00000000000000..2d47084f49940c --- /dev/null +++ b/config/feature_flags/development/ci_publish_pipeline_events.yml @@ -0,0 +1,8 @@ +--- +name: ci_publish_pipeline_events +introduced_by_url: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/34042 +rollout_issue_url: https://gitlab.com/gitlab-org/gitlab/-/issues/336752 +milestone: '14.3' +type: development +group: group::pipeline execution +default_enabled: false diff --git a/config/sidekiq_queues.yml b/config/sidekiq_queues.yml index 49989e022fa93c..6395f051a0a2c2 100644 --- a/config/sidekiq_queues.yml +++ b/config/sidekiq_queues.yml @@ -255,6 +255,8 @@ - 1 - - merge_requests_sync_code_owner_approval_rules - 1 +- - merge_requests_update_head_pipeline + - 1 - - metrics_dashboard_prune_old_annotations - 1 - - metrics_dashboard_sync_dashboards diff --git a/doc/development/event_store.md b/doc/development/event_store.md new file mode 100644 index 00000000000000..dcb6259aaa1311 --- /dev/null +++ b/doc/development/event_store.md @@ -0,0 +1,248 @@ +--- +stage: none +group: unassigned +info: To determine the technical writer assigned to the Stage/Group associated with this page, see https://about.gitlab.com/handbook/engineering/ux/technical-writing/#assignments +--- + +# GitLab EventStore + +## Background + +The monolithic GitLab project is becoming larger and more domains are being defined. +As a result, these domains are becoming entangled with each others due to temporal coupling. + +An emblematic example is the [`PostReceive`](https://gitlab.com/gitlab-org/gitlab/blob/master/app/workers/post_receive.rb) +worker where a lot happens across multiple domains. If a new behavior reacts to +a new commit being pushed, then we add code somewhere in `PostReceive` or its sub-components +(`Git::ProcessRefChangesService`, for example). + +This type of architecture: + +- Is a violation of the Single Responsibility Principle. +- Increases the risk of adding code to a codebase you are not familiar with. + There may be nuances you don't know about which may introduce bugs or a performance degradation. +- Violates domain boundaries. Inside a specific namespace (for example `Git::`) we suddenly see + classes from other domains chiming in (like `Ci::` or `MergeRequests::`). + +## What is EventStore? + +`Gitlab:EventStore` is a basic pub-sub system built on top of the existing Sidekiq workers and observability we have today. +We use this system to apply an event-driven approach when modeling a domain while keeping coupling +to a minimum. + +This essentially leaves the existing Sidekiq workers as-is to perform asynchronous work but inverts +the dependency. + +### EventStore example + +When a CI pipeline is created we update the head pipeline for any merge request matching the +pipeline's `ref`. The merge request can then display the status of the latest pipeline. + +#### Without the EventStore + +We change `Ci::CreatePipelineService` and add logic (like an `if` statement) to check if the +pipeline is created. Then we schedule a worker to run some side-effects for the `MergeRequests::` domain. + +This style violates the [Open-Closed Principle](https://en.wikipedia.org/wiki/Open%E2%80%93closed_principle) +and unnecessarily add side-effects logic from other domains, increasing coupling: + +```mermaid +graph LR + subgraph ci[CI] + cp[CreatePipelineService] + end + + subgraph mr[MergeRequests] + upw[UpdateHeadPipelineWorker] + end + + subgraph no[Namespaces::Onboarding] + pow[PipelinesOnboardedWorker] + end + + cp -- perform_async --> upw + cp -- perform_async --> pow +``` + +#### With the EventStore + +`Ci::CreatePipelineService` publishes an event `Ci::PipelineCreatedEvent` and its responsibility stops here. + +The `MergeRequests::` domain can subscribe to this event with a worker `MergeRequests::UpdateHeadPipelineWorker`, so: + +- Side-effects are scheduled asynchronously and don't impact the main business transaction that + emits the domain event. +- More side-effects can be added without modifying the main business transaction. +- We can clearly see what domains are involved and their ownership. +- We can identify what events occur in the system because they are explicitly declared. + +With `Gitlab::EventStore` there is still coupling between the subscriber (Sidekiq worker) and the schema of the domain event. +This level of coupling is much smaller than having the main transaction (`Ci::CreatePipelineService`) coupled to: + +- multiple subscribers. +- multiple ways of invoking subscribers (including conditional invocations). +- multiple ways of passing parameters. + +```mermaid +graph LR + subgraph ci[CI] + cp[CreatePipelineService] + cp -- publish --> e[PipelineCreateEvent] + end + + subgraph mr[MergeRequests] + upw[UpdateHeadPipelineWorker] + end + + subgraph no[Namespaces::Onboarding] + pow[PipelinesOnboardedWorker] + end + + upw -. subscribe .-> e + pow -. subscribe .-> e +``` + +## EventStore advantages + +- Subscribers (Sidekiq workers) can be set to run quicker by changing the worker weight + if the side-effect is critical. +- Automatically enforce the fact that side-effects run asynchronously. + This makes it safe for other domains to subscribe to events without affecting the performance of the + main business transaction. + +## Define an event + +An `Event` object represents a domain event that occurred in a bounded context. +Notify other bounded contexts about something +that happened by publishing events, so that they can react to it. + +Define new event classes under `app/events//` with a name representing something that happened in the past: + +```ruby +class Ci::PipelineCreatedEvent < Gitlab::EventStore::Event + def schema + { + 'type' => 'object', + 'required' => ['pipeline_id'], + 'properties' => { + 'pipeline_id' => { 'type' => 'integer' }, + 'ref' => { 'type' => 'string' } + } + } + end +end +``` + +The schema is validated immediately when we initialize the event object so we can ensure that +publishers follow the contract with the subscribers. + +We recommend using optional properties as much as possible, which require fewer rollouts for schema changes. +However, `required` properties could be used for unique identifiers of the event's subject. For example: + +- `pipeline_id` can be a required property for a `Ci::PipelineCreatedEvent`. +- `project_id` can be a required property for a `Projects::ProjectDeletedEvent`. + +Publish only properties that are needed by the subscribers without tailoring the payload to specific subscribers. +The payload should fully represent the event and not contain loosely related properties. For example: + +```ruby +Ci::PipelineCreatedEvent.new(data: { + pipeline_id: pipeline.id, + # unless all subscribers need merge request IDs, + # this is data that can be fetched by the subscriber. + merge_request_ids: pipeline.all_merge_requests.pluck(:id) +}) +``` + +Publishing events with more properties provides the subscribers with the data +they need in the first place. Otherwise subscribers have to fetch the additional data from the database. +However, this can lead to continuous changes to the schema and possibly adding properties that may not +represent the single source of truth. +It's best to use this technique as a performance optimization, like when an event has many +subscribers that all fetch the same data again from the database. + +### Update the schema + +Changes to the schema require multiple rollouts. While the new version is being deployed: + +- Existing publishers can publish events using the old version. +- Existing subscribers can consume events using the old version. +- Events get persisted in the Sidekiq queue as job arguments, so we could have 2 versions of the schema during deployments. + +#### Add properties + +1. Rollout 1: + - Add new properties as optional (not `required`). + - Update the subscriber so it can consume events with and without the new properties. +1. Rollout 2: + - Change the publisher to provide the new property +1. Rollout 3: (if the property should be `required`): + - Change the schema and the subscriber code to always expect it. + +#### Remove properties + +1. Rollout 1: + - If the property is `required`, make it optional. + - Update the subscriber so it does not always expect the property. +1. Rollout 2: + - Remove the property from the event publishing. + - Remove the code from the subscriber that processes the property. + +#### Other changes + +For other changes, like renaming a property, use the same steps: + +1. Remove the old property +1. Add the new property + +## Publish an event + +To publish the event from the [previous example](#define-an-event): + +```ruby +Gitlab::EventStore.publish( + Ci::PipelineCreatedEvent.new(data: { pipeline_id: pipeline.id }) +) +``` + +## Create a subscriber + +A subscriber is a Sidekiq worker that includes the `Gitlab::EventStore::Subscriber` module. +This module takes care of the `perform` method and provides a better abstraction to handle +the event safely via the `handle_event` method. For example: + +```ruby +module MergeRequests + class UpdateHeadPipelineWorker + include ApplicationWorker + include Gitlab::EventStore::Subscriber + + def handle_event(event) + Ci::Pipeline.find_by_id(event.data[:pipeline_id]).try do |pipeline| + # ... + end + end + end +end +``` + +## Register the subscriber to the event + +To subscribe the worker to a specific event in `lib/gitlab/event_store.rb`, +add a line like this to the `Gitlab::EventStore.configure!` method: + +```ruby +module Gitlab + module EventStore + def self.configure! + Store.new.tap do |store| + # ... + + store.subscribe ::MergeRequests::UpdateHeadPipelineWorker, to: ::Ci::PipelineCreatedEvent + + # ... + end + end + end +end +``` diff --git a/doc/development/index.md b/doc/development/index.md index 1398104abda6c7..a31bc2d5a6f412 100644 --- a/doc/development/index.md +++ b/doc/development/index.md @@ -164,6 +164,7 @@ the [reviewer values](https://about.gitlab.com/handbook/engineering/workflow/rev ### General - [Directory structure](directory_structure.md) +- [GitLab EventStore](event_store.md) to publish/subscribe to domain events - [GitLab utilities](utilities.md) - [Newlines style guide](newlines_styleguide.md) - [Logging](logging.md) diff --git a/lib/gitlab/event_store.rb b/lib/gitlab/event_store.rb new file mode 100644 index 00000000000000..578224c30ca0c3 --- /dev/null +++ b/lib/gitlab/event_store.rb @@ -0,0 +1,41 @@ +# frozen_string_literal: true + +# Gitlab::EventStore is a simple pub-sub mechanism that lets you publish +# domain events and use Sidekiq workers as event handlers. +# +# It can be used to decouple domains from different bounded contexts +# by publishing domain events and let any interested parties subscribe +# to them. +# +module Gitlab + module EventStore + Error = Class.new(StandardError) + InvalidEvent = Class.new(Error) + InvalidSubscriber = Class.new(Error) + + def self.publish(event) + instance.publish(event) + end + + def self.instance + @instance ||= configure! + end + + # Define all event subscriptions using: + # + # store.subscribe(DomainA::SomeWorker, to: DomainB::SomeEvent) + # + # It is possible to subscribe to a subset of events matching a condition: + # + # store.subscribe(DomainA::SomeWorker, to: DomainB::SomeEvent), if: ->(event) { event.data == :some_value } + # + def self.configure! + Store.new do |store| + ### + # Add subscriptions here: + + store.subscribe ::MergeRequests::UpdateHeadPipelineWorker, to: ::Ci::PipelineCreatedEvent + end + end + end +end diff --git a/lib/gitlab/event_store/event.rb b/lib/gitlab/event_store/event.rb new file mode 100644 index 00000000000000..e5b445ceb0988e --- /dev/null +++ b/lib/gitlab/event_store/event.rb @@ -0,0 +1,54 @@ +# frozen_string_literal: true + +# An Event object represents a domain event that occurred in a bounded context. +# By publishing events we notify other bounded contexts about something +# that happened, so that they can react to it. +# +# Define new event classes under `app/events//` with a name +# representing something that happened in the past: +# +# class Projects::ProjectCreatedEvent < Gitlab::EventStore::Event +# def schema +# { +# 'type' => 'object', +# 'properties' => { +# 'project_id' => { 'type' => 'integer' } +# } +# } +# end +# end +# +# To publish it: +# +# Gitlab::EventStore.publish( +# Projects::ProjectCreatedEvent.new(data: { project_id: project.id }) +# ) +# +module Gitlab + module EventStore + class Event + attr_reader :data + + def initialize(data:) + validate_schema!(data) + @data = data + end + + def schema + raise NotImplementedError, 'must specify schema to validate the event' + end + + private + + def validate_schema!(data) + unless data.is_a?(Hash) + raise Gitlab::EventStore::InvalidEvent, "Event data must be a Hash" + end + + unless JSONSchemer.schema(schema).valid?(data.deep_stringify_keys) + raise Gitlab::EventStore::InvalidEvent, "Data for event #{self.class} does not match the defined schema: #{data}" + end + end + end + end +end diff --git a/lib/gitlab/event_store/store.rb b/lib/gitlab/event_store/store.rb new file mode 100644 index 00000000000000..ecf3cd7e562a03 --- /dev/null +++ b/lib/gitlab/event_store/store.rb @@ -0,0 +1,54 @@ +# frozen_string_literal: true + +module Gitlab + module EventStore + class Store + attr_reader :subscriptions + + def initialize + @subscriptions = Hash.new { |h, k| h[k] = [] } + + yield(self) if block_given? + + # freeze the subscriptions as safety measure to avoid further + # subcriptions after initialization. + lock! + end + + def subscribe(worker, to:, if: nil) + condition = binding.local_variable_get('if') + + Array(to).each do |event| + validate_subscription!(worker, event) + subscriptions[event] << Gitlab::EventStore::Subscription.new(worker, condition) + end + end + + def publish(event) + unless event.is_a?(Event) + raise InvalidEvent, "Event being published is not an instance of Gitlab::EventStore::Event: got #{event.inspect}" + end + + subscriptions[event.class].each do |subscription| + subscription.consume_event(event) + end + end + + private + + def lock! + @subscriptions.freeze + end + + def validate_subscription!(subscriber, event_class) + unless event_class < Event + raise InvalidEvent, "Event being subscribed to is not a subclass of Gitlab::EventStore::Event: got #{event_class}" + end + + unless subscriber.respond_to?(:perform_async) + raise InvalidSubscriber, "Subscriber is not an ApplicationWorker: got #{subscriber}" + end + end + end + end +end diff --git a/lib/gitlab/event_store/subscriber.rb b/lib/gitlab/event_store/subscriber.rb new file mode 100644 index 00000000000000..e90597d1f95545 --- /dev/null +++ b/lib/gitlab/event_store/subscriber.rb @@ -0,0 +1,36 @@ +# frozen_string_literal: true + +# This module should be included in order to turn an ApplicationWorker +# into a Subscriber. +# This module overrides the `perform` method and provides a better and +# safer interface for handling events via `handle_event` method. +# +# @example: +# class SomeEventSubscriber +# include ApplicationWorker +# include Gitlab::EventStore::Subscriber +# +# def handle_event(event) +# # ... +# end +# end + +module Gitlab + module EventStore + module Subscriber + def perform(event_type, data) + raise InvalidEvent, event_type unless self.class.const_defined?(event_type) + + event = event_type.constantize.new( + data: data.with_indifferent_access + ) + + handle_event(event) + end + + def handle_event(event) + raise NotImplementedMethod, 'you must implement this methods in order to handle events' + end + end + end +end diff --git a/lib/gitlab/event_store/subscription.rb b/lib/gitlab/event_store/subscription.rb new file mode 100644 index 00000000000000..c59e90fdb119c9 --- /dev/null +++ b/lib/gitlab/event_store/subscription.rb @@ -0,0 +1,41 @@ +# frozen_string_literal: true + +module Gitlab + module EventStore + class Subscription + attr_reader :worker, :condition + + def initialize(worker, condition) + @worker = worker + @condition = condition + end + + def consume_event(event) + return unless condition_met?(event) + + if Gitlab::Database.main.inside_transaction? + raise ::Gitlab::EventStore::InvalidEvent, "Event #{event.class} cannot be published inside a transaction" + end + + worker.perform_async(event.class.name, event.data) + # TODO: Log dispatching of events to subscriber + + # We rescue and track any exceptions here because we don't want to + # impact other subscribers if one is faulty. + # The method `condition_met?`, since it can run a block, it might encounter + # a bug. By raising an exception here we could interrupt the publishing + # process, preventing other subscribers from consuming the event. + rescue => e # rubocop: disable Style/RescueStandardError + Gitlab::ErrorTracking.track_and_raise_for_dev_exception(e) + end + + private + + def condition_met?(event) + return true unless condition + + condition.call(event) + end + end + end +end diff --git a/spec/lib/gitlab/event_store/event_spec.rb b/spec/lib/gitlab/event_store/event_spec.rb new file mode 100644 index 00000000000000..c9ab11059fec56 --- /dev/null +++ b/spec/lib/gitlab/event_store/event_spec.rb @@ -0,0 +1,58 @@ +# frozen_string_literal: true + +require 'spec_helper' + +RSpec.describe Gitlab::EventStore::Event do + before do + stub_const('TestEvent', Class.new(described_class)) + + TestEvent.class_eval do + def schema + { + 'required' => ['project_id'], + 'type' => 'object', + 'properties' => { + 'project_id' => { 'type' => 'integer' }, + 'project_path' => { 'type' => 'string' } + } + } + end + end + end + + let(:event) { TestEvent.new(data: data) } + + describe 'schema validation' do + context 'when data matches the schema' do + let(:data) { { project_id: 123, project_path: 'org/the-project' } } + + it 'initializes the event correctly' do + expect(event.data).to eq(data) + end + end + + context 'when required properties are present as well as unknown properties' do + let(:data) { { project_id: 123, unknown_key: 'unknown_value' } } + + it 'initializes the event correctly' do + expect(event.data).to eq(data) + end + end + + context 'when some properties are missing' do + let(:data) { { project_path: 'org/the-project' } } + + it 'expects all properties to be present' do + expect { event }.to raise_error(Gitlab::EventStore::InvalidEvent, /does not match the defined schema/) + end + end + + context 'when data is not a Hash' do + let(:data) { 123 } + + it 'raises an error' do + expect { event }.to raise_error(Gitlab::EventStore::InvalidEvent, 'Event data must be a Hash') + end + end + end +end diff --git a/spec/lib/gitlab/event_store/store_spec.rb b/spec/lib/gitlab/event_store/store_spec.rb new file mode 100644 index 00000000000000..100bd7c5dfa10b --- /dev/null +++ b/spec/lib/gitlab/event_store/store_spec.rb @@ -0,0 +1,179 @@ +# frozen_string_literal: true + +require 'spec_helper' + +RSpec.describe Gitlab::EventStore::Store do + let(:event_klass) { stub_const('TestEvent', Class.new(Gitlab::EventStore::Event)) } + let(:event) { event_klass.new(data: data) } + + let(:another_event_klass) { stub_const('TestAnotherEvent', Class.new(Gitlab::EventStore::Event)) } + + let(:worker) { double(:worker, perform_async: nil) } + let(:another_worker) { double(:another_worker, perform_async: nil) } + + before do + event_klass.class_eval do + def schema + { + 'required' => %w[name id], + 'type' => 'object', + 'properties' => { + 'name' => { 'type' => 'string' }, + 'id' => { 'type' => 'integer' } + } + } + end + end + end + + describe '#subscribe' do + it 'subscribes a worker to an event' do + store = described_class.new do |s| + s.subscribe worker, to: event_klass + end + + subscriptions = store.subscriptions[event_klass] + expect(subscriptions.map(&:worker)).to contain_exactly(worker) + end + + it 'subscribes multiple workers to an event' do + store = described_class.new do |s| + s.subscribe worker, to: event_klass + s.subscribe another_worker, to: event_klass + end + + subscriptions = store.subscriptions[event_klass] + expect(subscriptions.map(&:worker)).to contain_exactly(worker, another_worker) + end + + it 'subscribes a worker to multiple events is separate calls' do + store = described_class.new do |s| + s.subscribe worker, to: event_klass + s.subscribe worker, to: another_event_klass + end + + subscriptions = store.subscriptions[event_klass] + expect(subscriptions.map(&:worker)).to contain_exactly(worker) + + subscriptions = store.subscriptions[another_event_klass] + expect(subscriptions.map(&:worker)).to contain_exactly(worker) + end + + it 'subscribes a worker to multiple events in a single call' do + store = described_class.new do |s| + s.subscribe worker, to: [event_klass, another_event_klass] + end + + subscriptions = store.subscriptions[event_klass] + expect(subscriptions.map(&:worker)).to contain_exactly(worker) + + subscriptions = store.subscriptions[another_event_klass] + expect(subscriptions.map(&:worker)).to contain_exactly(worker) + end + + it 'subscribes a worker to an event with condition' do + store = described_class.new do |s| + s.subscribe worker, to: event_klass, if: ->(event) { true } + end + + subscriptions = store.subscriptions[event_klass] + + expect(subscriptions.size).to eq(1) + + subscription = subscriptions.first + expect(subscription).to be_an_instance_of(Gitlab::EventStore::Subscription) + expect(subscription.worker).to eq(worker) + end + + it 'refuses the subscription if the target is not an Event object' do + expect do + described_class.new do |s| + s.subscribe worker, to: Integer + end + end.to raise_error( + Gitlab::EventStore::Error, + /Event being subscribed to is not a subclass of Gitlab::EventStore::Event/) + end + + it 'refuses the subscription if the subscriber is not a worker' do + expect do + described_class.new do |s| + s.subscribe double, to: event_klass + end + end.to raise_error( + Gitlab::EventStore::Error, + /Subscriber is not an ApplicationWorker/) + end + end + + describe '#publish' do + let(:data) { { name: 'Bob', id: 123 } } + + context 'when event has subscribed workers' do + let(:store) do + described_class.new do |store| + store.subscribe worker, to: event_klass + store.subscribe another_worker, to: event_klass + end + end + + it 'dispatches the event to each subscribed worker' do + expect(worker).to receive(:perform_async).with('TestEvent', data) + expect(another_worker).to receive(:perform_async).with('TestEvent', data) + + store.publish(event) + end + + context 'when an error is raised' do + before do + allow(worker).to receive(:perform_async).and_raise(NoMethodError, 'the error message') + end + + it 'is rescued and tracked' do + expect(Gitlab::ErrorTracking) + .to receive(:track_and_raise_for_dev_exception) + .and_call_original + + expect { store.publish(event) }.to raise_error(NoMethodError, 'the error message') + end + end + + it 'raises and tracks an error when event is published inside a database transaction' do + expect(Gitlab::ErrorTracking) + .to receive(:track_and_raise_for_dev_exception) + .and_call_original + + expect do + ApplicationRecord.transaction do + store.publish(event) + end + end.to raise_error(Gitlab::EventStore::InvalidEvent, /cannot be published inside a transaction/) + end + + it 'refuses publishing if the target is not an Event object' do + expect { store.publish(double(:event)) } + .to raise_error( + Gitlab::EventStore::Error, + /Event being published is not an instance of Gitlab::EventStore::Event/) + end + end + + context 'when event has subscribed workers with condition' do + let(:store) do + described_class.new do |s| + s.subscribe worker, to: event_klass, if: -> (event) { event.data[:name] == 'Bob' } + s.subscribe another_worker, to: event_klass, if: -> (event) { event.data[:name] == 'Alice' } + end + end + + let(:event) { event_klass.new(data: data) } + + it 'dispatches the event to the workers satisfying the condition' do + expect(worker).to receive(:perform_async).with('TestEvent', data) + expect(another_worker).not_to receive(:perform_async) + + store.publish(event) + end + end + end +end diff --git a/spec/services/ci/create_pipeline_service_spec.rb b/spec/services/ci/create_pipeline_service_spec.rb index ef879d536c35ee..98ec02d59c613d 100644 --- a/spec/services/ci/create_pipeline_service_spec.rb +++ b/spec/services/ci/create_pipeline_service_spec.rb @@ -146,138 +146,44 @@ def execute_service( end context 'when merge requests already exist for this source branch' do - let(:merge_request_1) do + let!(:merge_request_1) do create(:merge_request, source_branch: 'feature', target_branch: "master", source_project: project) end - let(:merge_request_2) do + let!(:merge_request_2) do create(:merge_request, source_branch: 'feature', target_branch: "v1.1.0", source_project: project) end - context 'when related merge request is already merged' do - let!(:merged_merge_request) do - create(:merge_request, source_branch: 'master', target_branch: "branch_2", source_project: project, state: 'merged') - end - - it 'does not schedule update head pipeline job' do - expect(UpdateHeadPipelineForMergeRequestWorker).not_to receive(:perform_async).with(merged_merge_request.id) - - execute_service - end - end - context 'when the head pipeline sha equals merge request sha' do it 'updates head pipeline of each merge request', :sidekiq_might_not_need_inline do - merge_request_1 - merge_request_2 - head_pipeline = execute_service(ref: 'feature', after: nil).payload expect(merge_request_1.reload.head_pipeline).to eq(head_pipeline) expect(merge_request_2.reload.head_pipeline).to eq(head_pipeline) end - end - - context 'when the head pipeline sha does not equal merge request sha' do - it 'does not update the head piepeline of MRs' do - merge_request_1 - merge_request_2 - - allow_any_instance_of(Ci::Pipeline).to receive(:latest?).and_return(true) - - expect { execute_service(after: 'ae73cb07c9eeaf35924a10f713b364d32b2dd34f') }.not_to raise_error - - last_pipeline = Ci::Pipeline.last - - expect(merge_request_1.reload.head_pipeline).not_to eq(last_pipeline) - expect(merge_request_2.reload.head_pipeline).not_to eq(last_pipeline) - end - end - - context 'when there is no pipeline for source branch' do - it "does not update merge request head pipeline" do - merge_request = create(:merge_request, source_branch: 'feature', - target_branch: "branch_1", - source_project: project) - - head_pipeline = execute_service.payload - - expect(merge_request.reload.head_pipeline).not_to eq(head_pipeline) - end - end - - context 'when merge request target project is different from source project' do - let!(:project) { fork_project(target_project, nil, repository: true) } - let!(:target_project) { create(:project, :repository) } - let!(:user) { create(:user) } - - before do - project.add_developer(user) - end - - it 'updates head pipeline for merge request', :sidekiq_might_not_need_inline do - merge_request = create(:merge_request, source_branch: 'feature', - target_branch: "master", - source_project: project, - target_project: target_project) - - head_pipeline = execute_service(ref: 'feature', after: nil).payload - - expect(merge_request.reload.head_pipeline).to eq(head_pipeline) - end - end - - context 'when the pipeline is not the latest for the branch' do - it 'does not update merge request head pipeline' do - merge_request = create(:merge_request, source_branch: 'master', - target_branch: "branch_1", - source_project: project) - - allow_any_instance_of(MergeRequest) - .to receive(:find_actual_head_pipeline) { } - execute_service + # TODO: remove after ci_publish_pipeline_events FF is removed + # https://gitlab.com/gitlab-org/gitlab/-/issues/336752 + it 'does not schedule sync update for the head pipeline of the merge request' do + expect(UpdateHeadPipelineForMergeRequestWorker) + .not_to receive(:perform_async) - expect(merge_request.reload.head_pipeline).to be_nil + execute_service(ref: 'feature', after: nil) end end - context 'when pipeline has errors' do + context 'when feature flag ci_publish_pipeline_events is disabled' do before do - stub_ci_pipeline_yaml_file('some invalid syntax') + stub_feature_flags(ci_publish_pipeline_events: false) end - it 'updates merge request head pipeline reference', :sidekiq_might_not_need_inline do - merge_request = create(:merge_request, source_branch: 'master', - target_branch: 'feature', - source_project: project) - - head_pipeline = execute_service.payload - - expect(head_pipeline).to be_persisted - expect(head_pipeline.yaml_errors).to be_present - expect(head_pipeline.messages).to be_present - expect(merge_request.reload.head_pipeline).to eq head_pipeline - end - end - - context 'when pipeline has been skipped' do - before do - allow_any_instance_of(Ci::Pipeline) - .to receive(:git_commit_message) - .and_return('some commit [ci skip]') - end - - it 'updates merge request head pipeline', :sidekiq_might_not_need_inline do - merge_request = create(:merge_request, source_branch: 'master', - target_branch: 'feature', - source_project: project) - - head_pipeline = execute_service.payload + it 'schedules update for the head pipeline of the merge request' do + expect(UpdateHeadPipelineForMergeRequestWorker) + .to receive(:perform_async).with(merge_request_1.id) + expect(UpdateHeadPipelineForMergeRequestWorker) + .to receive(:perform_async).with(merge_request_2.id) - expect(head_pipeline).to be_skipped - expect(head_pipeline).to be_persisted - expect(merge_request.reload.head_pipeline).to eq head_pipeline + execute_service(ref: 'feature', after: nil) end end end @@ -1655,7 +1561,7 @@ def previous_commit_sha_from_ref(ref) expect(pipeline.target_sha).to be_nil end - it 'schedules update for the head pipeline of the merge request' do + it 'schedules update for the head pipeline of the merge request', :sidekiq_inline do expect(UpdateHeadPipelineForMergeRequestWorker) .to receive(:perform_async).with(merge_request.id) diff --git a/spec/workers/merge_requests/update_head_pipeline_worker_spec.rb b/spec/workers/merge_requests/update_head_pipeline_worker_spec.rb new file mode 100644 index 00000000000000..f3ea14ad53953a --- /dev/null +++ b/spec/workers/merge_requests/update_head_pipeline_worker_spec.rb @@ -0,0 +1,138 @@ +# frozen_string_literal: true + +require 'spec_helper' + +RSpec.describe MergeRequests::UpdateHeadPipelineWorker do + include ProjectForksHelper + + let_it_be(:project) { create(:project, :repository) } + + let(:ref) { 'master' } + let(:pipeline) { create(:ci_pipeline, project: project, ref: ref) } + let(:event) { Ci::PipelineCreatedEvent.new(data: { pipeline_id: pipeline.id }) } + + subject { consume_event(event) } + + def consume_event(event) + described_class.new.perform(event.class.name, event.data) + end + + context 'when merge requests already exist for this source branch', :sidekiq_inline do + let(:merge_request_1) do + create(:merge_request, source_branch: 'feature', target_branch: "master", source_project: project) + end + + let(:merge_request_2) do + create(:merge_request, source_branch: 'feature', target_branch: "v1.1.0", source_project: project) + end + + context 'when related merge request is already merged' do + let!(:merged_merge_request) do + create(:merge_request, source_branch: 'master', target_branch: "branch_2", source_project: project, state: 'merged') + end + + it 'does not schedule update head pipeline job' do + expect(UpdateHeadPipelineForMergeRequestWorker).not_to receive(:perform_async).with(merged_merge_request.id) + + subject + end + end + + context 'when the head pipeline sha equals merge request sha' do + let(:ref) { 'feature' } + + before do + pipeline.update!(sha: project.repository.commit(ref).id) + end + + it 'updates head pipeline of each merge request' do + merge_request_1 + merge_request_2 + + subject + + expect(merge_request_1.reload.head_pipeline).to eq(pipeline) + expect(merge_request_2.reload.head_pipeline).to eq(pipeline) + end + end + + context 'when the head pipeline sha does not equal merge request sha' do + let(:ref) { 'feature' } + + it 'does not update the head piepeline of MRs' do + merge_request_1 + merge_request_2 + + subject + + expect(merge_request_1.reload.head_pipeline).not_to eq(pipeline) + expect(merge_request_2.reload.head_pipeline).not_to eq(pipeline) + end + end + + context 'when there is no pipeline for source branch' do + it "does not update merge request head pipeline" do + merge_request = create(:merge_request, source_branch: 'feature', + target_branch: "branch_1", + source_project: project) + + subject + + expect(merge_request.reload.head_pipeline).not_to eq(pipeline) + end + end + + context 'when merge request target project is different from source project' do + let(:project) { fork_project(target_project, nil, repository: true) } + let(:target_project) { create(:project, :repository) } + let(:user) { create(:user) } + let(:ref) { 'feature' } + + before do + project.add_developer(user) + pipeline.update!(sha: project.repository.commit(ref).id) + end + + it 'updates head pipeline for merge request' do + merge_request = create(:merge_request, source_branch: 'feature', + target_branch: "master", + source_project: project, + target_project: target_project) + + subject + + expect(merge_request.reload.head_pipeline).to eq(pipeline) + end + end + + context 'when the pipeline is not the latest for the branch' do + it 'does not update merge request head pipeline' do + merge_request = create(:merge_request, source_branch: 'master', + target_branch: "branch_1", + source_project: project) + + create(:ci_pipeline, project: pipeline.project, ref: pipeline.ref) + + subject + + expect(merge_request.reload.head_pipeline).to be_nil + end + end + + context 'when pipeline has errors' do + before do + pipeline.update!(yaml_errors: 'some errors', status: :failed) + end + + it 'updates merge request head pipeline reference' do + merge_request = create(:merge_request, source_branch: 'master', + target_branch: 'feature', + source_project: project) + + subject + + expect(merge_request.reload.head_pipeline).to eq(pipeline) + end + end + end +end -- GitLab From 355b9ccc125c6a7d0e725a12d151178400ef5863 Mon Sep 17 00:00:00 2001 From: Fabio Pitino Date: Fri, 8 Oct 2021 13:28:21 +0100 Subject: [PATCH 2/8] Clarify how Sidekiq is used under the hood --- doc/development/event_store.md | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/doc/development/event_store.md b/doc/development/event_store.md index dcb6259aaa1311..7ef0a49baac042 100644 --- a/doc/development/event_store.md +++ b/doc/development/event_store.md @@ -102,6 +102,21 @@ graph LR pow -. subscribe .-> e ``` +Each subscriber, being itself a Sidekiq worker, can specify any attributes that are related +to the type of work they are responsible for. For example, one subscriber could define +`urgency: high` while another one less critical could set `urgency: low`. + +The EventStore is only an abstraction that allows us to have Dependency Inversion. This helps +separating a business transaction from side-effects (often executed in other domains). + +When an event is published, the EventStore calls `perform_async` on each subscribed worker, +passing in the event information as arguments. This essentially schedules a Sidekiq job on each +subscriber's queue. + +This means that nothing else changes with regards to how subscribers work, as they are just +Sidekiq workers. For example: if a worker (subscriber) fails to execute a job, the job is put +back into Sidekiq queue as many times as the retries set for the given worker. + ## EventStore advantages - Subscribers (Sidekiq workers) can be set to run quicker by changing the worker weight @@ -169,6 +184,9 @@ Changes to the schema require multiple rollouts. While the new version is being - Existing subscribers can consume events using the old version. - Events get persisted in the Sidekiq queue as job arguments, so we could have 2 versions of the schema during deployments. +As changing the schema ultimately impacts the Sidekiq arguments, please refer to our +[Sidekiq style guide](sidekiq_style_guide.md#changing-the-arguments-for-a-worker) with regards to multiple rollouts. + #### Add properties 1. Rollout 1: @@ -246,3 +264,6 @@ module Gitlab end end ``` + +Subscriptions are stored in memory when the Rails app is loaded and they are immediately frozen. +It's not possible to modify subscriptions at runtime. -- GitLab From e8d391d0adbaa826e271705e4cbee9712a7d961f Mon Sep 17 00:00:00 2001 From: Fabio Pitino Date: Fri, 12 Nov 2021 14:01:18 +0000 Subject: [PATCH 3/8] Reuse existing Sidekiq errors from inside transaction --- lib/gitlab/event_store/subscription.rb | 8 ++------ spec/lib/gitlab/event_store/store_spec.rb | 18 ++++++++++++++---- 2 files changed, 16 insertions(+), 10 deletions(-) diff --git a/lib/gitlab/event_store/subscription.rb b/lib/gitlab/event_store/subscription.rb index c59e90fdb119c9..e5c92ab969f9be 100644 --- a/lib/gitlab/event_store/subscription.rb +++ b/lib/gitlab/event_store/subscription.rb @@ -13,10 +13,6 @@ def initialize(worker, condition) def consume_event(event) return unless condition_met?(event) - if Gitlab::Database.main.inside_transaction? - raise ::Gitlab::EventStore::InvalidEvent, "Event #{event.class} cannot be published inside a transaction" - end - worker.perform_async(event.class.name, event.data) # TODO: Log dispatching of events to subscriber @@ -25,8 +21,8 @@ def consume_event(event) # The method `condition_met?`, since it can run a block, it might encounter # a bug. By raising an exception here we could interrupt the publishing # process, preventing other subscribers from consuming the event. - rescue => e # rubocop: disable Style/RescueStandardError - Gitlab::ErrorTracking.track_and_raise_for_dev_exception(e) + rescue StandardError => e + Gitlab::ErrorTracking.track_and_raise_for_dev_exception(e, event_class: event.class.name, event_data: event.data) end private diff --git a/spec/lib/gitlab/event_store/store_spec.rb b/spec/lib/gitlab/event_store/store_spec.rb index 100bd7c5dfa10b..0d42d1e1f9652f 100644 --- a/spec/lib/gitlab/event_store/store_spec.rb +++ b/spec/lib/gitlab/event_store/store_spec.rb @@ -5,11 +5,19 @@ RSpec.describe Gitlab::EventStore::Store do let(:event_klass) { stub_const('TestEvent', Class.new(Gitlab::EventStore::Event)) } let(:event) { event_klass.new(data: data) } - let(:another_event_klass) { stub_const('TestAnotherEvent', Class.new(Gitlab::EventStore::Event)) } - let(:worker) { double(:worker, perform_async: nil) } - let(:another_worker) { double(:another_worker, perform_async: nil) } + let(:worker) do + stub_const('EventSubscriber', Class.new).tap do |klass| + klass.class_eval { include ApplicationWorker } + end + end + + let(:another_worker) do + stub_const('AnotherEventSubscriber', Class.new).tap do |klass| + klass.class_eval { include ApplicationWorker } + end + end before do event_klass.class_eval do @@ -132,6 +140,7 @@ def schema it 'is rescued and tracked' do expect(Gitlab::ErrorTracking) .to receive(:track_and_raise_for_dev_exception) + .with(kind_of(NoMethodError), event_class: event.class.name, event_data: event.data) .and_call_original expect { store.publish(event) }.to raise_error(NoMethodError, 'the error message') @@ -141,13 +150,14 @@ def schema it 'raises and tracks an error when event is published inside a database transaction' do expect(Gitlab::ErrorTracking) .to receive(:track_and_raise_for_dev_exception) + .at_least(:once) .and_call_original expect do ApplicationRecord.transaction do store.publish(event) end - end.to raise_error(Gitlab::EventStore::InvalidEvent, /cannot be published inside a transaction/) + end.to raise_error(Sidekiq::Worker::EnqueueFromTransactionError) end it 'refuses publishing if the target is not an Event object' do -- GitLab From 1a7d9c8d4e66141ada924eb6fb67373567c4e403 Mon Sep 17 00:00:00 2001 From: Fabio Pitino Date: Fri, 12 Nov 2021 15:53:32 +0000 Subject: [PATCH 4/8] Improve specs around publishing --- spec/lib/gitlab/event_store/store_spec.rb | 36 +++++++++++++++++++---- 1 file changed, 31 insertions(+), 5 deletions(-) diff --git a/spec/lib/gitlab/event_store/store_spec.rb b/spec/lib/gitlab/event_store/store_spec.rb index 0d42d1e1f9652f..bd5332b65e325a 100644 --- a/spec/lib/gitlab/event_store/store_spec.rb +++ b/spec/lib/gitlab/event_store/store_spec.rb @@ -19,6 +19,12 @@ end end + let(:unrelated_worker) do + stub_const('UnrelatedEventSubscriber', Class.new).tap do |klass| + klass.class_eval { include ApplicationWorker } + end + end + before do event_klass.class_eval do def schema @@ -81,7 +87,7 @@ def schema it 'subscribes a worker to an event with condition' do store = described_class.new do |s| - s.subscribe worker, to: event_klass, if: ->(event) { true } + s.subscribe worker, to: event_klass, if: ->(event) { event.data[:name] == 'Alice' } end subscriptions = store.subscriptions[event_klass] @@ -91,6 +97,8 @@ def schema subscription = subscriptions.first expect(subscription).to be_an_instance_of(Gitlab::EventStore::Subscription) expect(subscription.worker).to eq(worker) + expect(subscription.condition.call(double(data: { name: 'Bob' }))).to eq(false) + expect(subscription.condition.call(double(data: { name: 'Alice' }))).to eq(true) end it 'refuses the subscription if the target is not an Event object' do @@ -117,21 +125,39 @@ def schema describe '#publish' do let(:data) { { name: 'Bob', id: 123 } } - context 'when event has subscribed workers' do + context 'when event has a subscribed worker' do let(:store) do described_class.new do |store| store.subscribe worker, to: event_klass - store.subscribe another_worker, to: event_klass + store.subscribe another_worker, to: another_event_klass end end - it 'dispatches the event to each subscribed worker' do + it 'dispatches the event to the subscribed worker' do expect(worker).to receive(:perform_async).with('TestEvent', data) - expect(another_worker).to receive(:perform_async).with('TestEvent', data) + expect(another_worker).not_to receive(:perform_async) store.publish(event) end + context 'when other workers subscribe to the same event' do + let(:store) do + described_class.new do |store| + store.subscribe worker, to: event_klass + store.subscribe another_worker, to: event_klass + store.subscribe unrelated_worker, to: another_event_klass + end + end + + it 'dispatches the event to each subscribed worker' do + expect(worker).to receive(:perform_async).with('TestEvent', data) + expect(another_worker).to receive(:perform_async).with('TestEvent', data) + expect(unrelated_worker).not_to receive(:perform_async) + + store.publish(event) + end + end + context 'when an error is raised' do before do allow(worker).to receive(:perform_async).and_raise(NoMethodError, 'the error message') -- GitLab From cbeb313a0a852b282d6645da9ad62d351a524ade Mon Sep 17 00:00:00 2001 From: Fabio Pitino Date: Fri, 12 Nov 2021 16:31:32 +0000 Subject: [PATCH 5/8] Document conditional dispatch feature --- doc/development/event_store.md | 27 +++++++++++++++++++++++++-- 1 file changed, 25 insertions(+), 2 deletions(-) diff --git a/doc/development/event_store.md b/doc/development/event_store.md index 7ef0a49baac042..85dfdf8b8efaa2 100644 --- a/doc/development/event_store.md +++ b/doc/development/event_store.md @@ -115,7 +115,7 @@ subscriber's queue. This means that nothing else changes with regards to how subscribers work, as they are just Sidekiq workers. For example: if a worker (subscriber) fails to execute a job, the job is put -back into Sidekiq queue as many times as the retries set for the given worker. +back into Sidekiq to be retried. ## EventStore advantages @@ -173,7 +173,7 @@ Publishing events with more properties provides the subscribers with the data they need in the first place. Otherwise subscribers have to fetch the additional data from the database. However, this can lead to continuous changes to the schema and possibly adding properties that may not represent the single source of truth. -It's best to use this technique as a performance optimization, like when an event has many +It's best to use this technique as a performance optimization. For example: when an event has many subscribers that all fetch the same data again from the database. ### Update the schema @@ -267,3 +267,26 @@ end Subscriptions are stored in memory when the Rails app is loaded and they are immediately frozen. It's not possible to modify subscriptions at runtime. + +### Conditional dispatch of events + +A subscription can specify a condition when to accept an event: + +```ruby +store.subscribe ::MergeRequests::UpdateHeadPipelineWorker, + to: ::Ci::PipelineCreatedEvent, + if: -> (event) { event.data[:merge_request_id].present? } +``` + +This tells the event store to dispatch `Ci::PipelineCreatedEvent`s to the subscriber if +the condition is met. + +This technique can avoid scheduling Sidekiq jobs if the subscriber is interested in a +small subset of events. + +WARNING: +When using conditional dispatch it must contain only cheap conditions because they are +executed synchronously every time the given event is published. + +For complex conditions it's best to subscribe to all the events and then handle the logic +in the `handle_event` method of the subscriber worker. -- GitLab From de0fbec8255180ace979e016e9b69e50180225f8 Mon Sep 17 00:00:00 2001 From: Fabio Pitino Date: Fri, 10 Dec 2021 11:18:10 +0000 Subject: [PATCH 6/8] Add specs for subscriber --- lib/gitlab/event_store.rb | 1 + lib/gitlab/event_store/event.rb | 2 +- spec/lib/gitlab/event_store/store_spec.rb | 35 ++++++++++++++++++++++- 3 files changed, 36 insertions(+), 2 deletions(-) diff --git a/lib/gitlab/event_store.rb b/lib/gitlab/event_store.rb index 578224c30ca0c3..3d7b6b27eb0b0e 100644 --- a/lib/gitlab/event_store.rb +++ b/lib/gitlab/event_store.rb @@ -37,5 +37,6 @@ def self.configure! store.subscribe ::MergeRequests::UpdateHeadPipelineWorker, to: ::Ci::PipelineCreatedEvent end end + private_class_method :configure! end end diff --git a/lib/gitlab/event_store/event.rb b/lib/gitlab/event_store/event.rb index e5b445ceb0988e..ee0c329b8e849f 100644 --- a/lib/gitlab/event_store/event.rb +++ b/lib/gitlab/event_store/event.rb @@ -46,7 +46,7 @@ def validate_schema!(data) end unless JSONSchemer.schema(schema).valid?(data.deep_stringify_keys) - raise Gitlab::EventStore::InvalidEvent, "Data for event #{self.class} does not match the defined schema: #{data}" + raise Gitlab::EventStore::InvalidEvent, "Data for event #{self.class} does not match the defined schema: #{schema}" end end end diff --git a/spec/lib/gitlab/event_store/store_spec.rb b/spec/lib/gitlab/event_store/store_spec.rb index bd5332b65e325a..046053b2a916ef 100644 --- a/spec/lib/gitlab/event_store/store_spec.rb +++ b/spec/lib/gitlab/event_store/store_spec.rb @@ -9,7 +9,14 @@ let(:worker) do stub_const('EventSubscriber', Class.new).tap do |klass| - klass.class_eval { include ApplicationWorker } + klass.class_eval do + include ApplicationWorker + include Gitlab::EventStore::Subscriber + + def handle_event(event) + event.data + end + end end end @@ -212,4 +219,30 @@ def schema end end end + + describe 'subscriber' do + let(:data) { { name: 'Bob', id: 123 } } + let(:event_name) { event.class.name } + let(:worker_instance) { worker.new } + + subject { worker_instance.perform(event_name, data) } + + it 'handles the event' do + expect(worker_instance).to receive(:handle_event).with(instance_of(event.class)) + + expect_any_instance_of(event.class) do |event| + expect(event).to receive(:data).and_return(data) + end + + subject + end + + context 'when the event name does not exist' do + let(:event_name) { 'UnknownClass' } + + it 'raises an error' do + expect { subject }.to raise_error(Gitlab::EventStore::InvalidEvent) + end + end + end end -- GitLab From 32f8f44adc5bb3a8bfffb71353d2ea7dd3fe3905 Mon Sep 17 00:00:00 2001 From: Fabio Pitino Date: Fri, 17 Dec 2021 08:56:12 +0000 Subject: [PATCH 7/8] Add spec for missing schema in Event class --- spec/lib/gitlab/event_store/event_spec.rb | 78 ++++++++++++----------- 1 file changed, 42 insertions(+), 36 deletions(-) diff --git a/spec/lib/gitlab/event_store/event_spec.rb b/spec/lib/gitlab/event_store/event_spec.rb index c9ab11059fec56..97f6870a5ece7a 100644 --- a/spec/lib/gitlab/event_store/event_spec.rb +++ b/spec/lib/gitlab/event_store/event_spec.rb @@ -3,55 +3,61 @@ require 'spec_helper' RSpec.describe Gitlab::EventStore::Event do - before do - stub_const('TestEvent', Class.new(described_class)) - - TestEvent.class_eval do - def schema - { - 'required' => ['project_id'], - 'type' => 'object', - 'properties' => { - 'project_id' => { 'type' => 'integer' }, - 'project_path' => { 'type' => 'string' } - } - } - end + let(:event_class) { stub_const('TestEvent', Class.new(described_class)) } + let(:event) { event_class.new(data: data) } + let(:data) { { project_id: 123, project_path: 'org/the-project' } } + + context 'when schema is not defined' do + it 'raises an error on initialization' do + expect { event }.to raise_error(NotImplementedError) end end - let(:event) { TestEvent.new(data: data) } - - describe 'schema validation' do - context 'when data matches the schema' do - let(:data) { { project_id: 123, project_path: 'org/the-project' } } - - it 'initializes the event correctly' do - expect(event.data).to eq(data) + context 'when schema is defined' do + before do + event_class.class_eval do + def schema + { + 'required' => ['project_id'], + 'type' => 'object', + 'properties' => { + 'project_id' => { 'type' => 'integer' }, + 'project_path' => { 'type' => 'string' } + } + } + end end end - context 'when required properties are present as well as unknown properties' do - let(:data) { { project_id: 123, unknown_key: 'unknown_value' } } + describe 'schema validation' do + context 'when data matches the schema' do + it 'initializes the event correctly' do + expect(event.data).to eq(data) + end + end + + context 'when required properties are present as well as unknown properties' do + let(:data) { { project_id: 123, unknown_key: 'unknown_value' } } - it 'initializes the event correctly' do - expect(event.data).to eq(data) + it 'initializes the event correctly' do + expect(event.data).to eq(data) + end end - end - context 'when some properties are missing' do - let(:data) { { project_path: 'org/the-project' } } + context 'when some properties are missing' do + let(:data) { { project_path: 'org/the-project' } } - it 'expects all properties to be present' do - expect { event }.to raise_error(Gitlab::EventStore::InvalidEvent, /does not match the defined schema/) + it 'expects all properties to be present' do + expect { event }.to raise_error(Gitlab::EventStore::InvalidEvent, /does not match the defined schema/) + end end - end - context 'when data is not a Hash' do - let(:data) { 123 } + context 'when data is not a Hash' do + let(:data) { 123 } - it 'raises an error' do - expect { event }.to raise_error(Gitlab::EventStore::InvalidEvent, 'Event data must be a Hash') + it 'raises an error' do + expect { event }.to raise_error(Gitlab::EventStore::InvalidEvent, 'Event data must be a Hash') + end end end end -- GitLab From 5c7d20f5fa56c3ba647f540b342be1bcfa2f662e Mon Sep 17 00:00:00 2001 From: Fabio Pitino Date: Fri, 17 Dec 2021 11:16:20 +0000 Subject: [PATCH 8/8] Fix coverage error --- lib/gitlab/event_store/subscriber.rb | 2 +- spec/lib/gitlab/event_store/store_spec.rb | 18 ++++++++++++++++-- 2 files changed, 17 insertions(+), 3 deletions(-) diff --git a/lib/gitlab/event_store/subscriber.rb b/lib/gitlab/event_store/subscriber.rb index e90597d1f95545..cf326d1f9e45eb 100644 --- a/lib/gitlab/event_store/subscriber.rb +++ b/lib/gitlab/event_store/subscriber.rb @@ -29,7 +29,7 @@ def perform(event_type, data) end def handle_event(event) - raise NotImplementedMethod, 'you must implement this methods in order to handle events' + raise NotImplementedError, 'you must implement this methods in order to handle events' end end end diff --git a/spec/lib/gitlab/event_store/store_spec.rb b/spec/lib/gitlab/event_store/store_spec.rb index 046053b2a916ef..711e1d5b4d5df3 100644 --- a/spec/lib/gitlab/event_store/store_spec.rb +++ b/spec/lib/gitlab/event_store/store_spec.rb @@ -22,13 +22,19 @@ def handle_event(event) let(:another_worker) do stub_const('AnotherEventSubscriber', Class.new).tap do |klass| - klass.class_eval { include ApplicationWorker } + klass.class_eval do + include ApplicationWorker + include Gitlab::EventStore::Subscriber + end end end let(:unrelated_worker) do stub_const('UnrelatedEventSubscriber', Class.new).tap do |klass| - klass.class_eval { include ApplicationWorker } + klass.class_eval do + include ApplicationWorker + include Gitlab::EventStore::Subscriber + end end end @@ -244,5 +250,13 @@ def schema expect { subject }.to raise_error(Gitlab::EventStore::InvalidEvent) end end + + context 'when the worker does not define handle_event method' do + let(:worker_instance) { another_worker.new } + + it 'raises an error' do + expect { subject }.to raise_error(NotImplementedError) + end + end end end -- GitLab