diff --git a/app/models/ci/build.rb b/app/models/ci/build.rb index 59bff4e2d2bad6e2b8b435f4e490614d480cbe9a..c7df5c4c68bdc2a1acc6d9302c9451f712dd1dec 100644 --- a/app/models/ci/build.rb +++ b/app/models/ci/build.rb @@ -40,6 +40,7 @@ class Build < CommitStatus }.freeze has_one :deployment, as: :deployable, class_name: 'Deployment' + has_one :job_lock, class_name: 'Ci::JobLock', foreign_key: :job_id has_many :trace_sections, class_name: 'Ci::BuildTraceSection' has_many :trace_chunks, class_name: 'Ci::BuildTraceChunk', foreign_key: :build_id @@ -60,6 +61,7 @@ class Build < CommitStatus delegate :terminal_specification, to: :runner_session, allow_nil: true delegate :gitlab_deploy_token, to: :project delegate :trigger_short_token, to: :trigger_request, allow_nil: true + delegate :try_lock, to: :job_lock ## # Since Gitlab 11.5, deployments records started being created right after @@ -257,6 +259,8 @@ def retry(build, current_user) end after_transition any => [:success, :failed, :canceled] do |build| + build.job_lock&.release + build.run_after_commit do BuildFinishedWorker.perform_async(id) end @@ -347,6 +351,10 @@ def schedulable? self.when == 'delayed' && options[:start_in].present? end + def lockable? + job_lock.present? + end + def options_scheduled_at ChronicDuration.parse(options[:start_in])&.seconds&.from_now end @@ -420,6 +428,22 @@ def has_environment? environment.present? end + def has_lock? + options[:lock].present? + end + + def lock_key + options.fetch(:lock, nil) + end + + def expanded_lock_key + return unless has_lock? + + strong_memoize(:expanded_lock_key) do + ExpandVariables.expand(lock_key, -> { simple_variables }) + end + end + def starts_environment? has_environment? && self.environment_action == 'start' end diff --git a/app/models/ci/job_lock.rb b/app/models/ci/job_lock.rb new file mode 100644 index 0000000000000000000000000000000000000000..7996b155bd4726ca4991a0497a4c9f495e06e2da --- /dev/null +++ b/app/models/ci/job_lock.rb @@ -0,0 +1,51 @@ +# frozen_string_literal: true + +module Ci + class JobLock < ApplicationRecord + self.table_name = 'ci_job_locks' + + belongs_to :ci_semaphore, class_name: 'Ci::ProjectSemaphore', + foreign_key: :semaphore_id, inverse_of: :job_locks + + belongs_to :job, class_name: 'Ci::Build', inverse_of: :job_lock + + delegate :under_limit?, :unlock_next, to: :ci_semaphore + + state_machine :status, initial: :created do + event :obtain do + transition %i[created blocked] => :locking + end + + event :wait do + transition created: :blocked + end + + event :release do + transition any - [:released] => :released + end + + before_transition blocked: :locking do |job_lock| + job_lock.blocked_duration = Time.now - job_lock.updated_at + end + + before_transition any => :released do |job_lock| + job_lock.unlock_next(from: job_lock) + end + + after_transition %i[created blocked] => :locking do |job_lock| + job_lock.job.enqueue + end + end + + enum status: { + created: 0, + locking: 1, + blocked: 2, + released: 3, + } + + def try_lock + under_limit? ? obtain : wait + end + end +end diff --git a/app/models/ci/project_semaphore.rb b/app/models/ci/project_semaphore.rb new file mode 100644 index 0000000000000000000000000000000000000000..5f7064269b74a69f8a35af89ced68617178d08ad --- /dev/null +++ b/app/models/ci/project_semaphore.rb @@ -0,0 +1,25 @@ +# frozen_string_literal: true + +module Ci + class ProjectSemaphore < ApplicationRecord + self.table_name = 'ci_project_semaphores' + + belongs_to :project, inverse_of: :ci_semaphores + + has_many :job_locks, class_name: 'Ci::JobLock', foreign_key: :semaphore_id + + validates :key, length: { minimum: 1, maximum: 255 } + + def under_limit? + job_locks.locking.count < concurrency + end + + def unlock_next(from:) + next_blocked(from.id)&.obtain + end + + def next_blocked(from_id) + job_locks.blocked.where('id > ?', from_id).order(:id).take + end + end +end diff --git a/app/models/project.rb b/app/models/project.rb index 9a208b0058b04dcbad8a1078074afac33ba268c2..7122b74c782399b557f30ed5ab78820bfb1d02ad 100644 --- a/app/models/project.rb +++ b/app/models/project.rb @@ -281,6 +281,7 @@ class Project < ApplicationRecord has_many :builds, class_name: 'Ci::Build', inverse_of: :project, dependent: :destroy # rubocop:disable Cop/ActiveRecordDependent has_many :build_trace_section_names, class_name: 'Ci::BuildTraceSectionName' has_many :build_trace_chunks, class_name: 'Ci::BuildTraceChunk', through: :builds, source: :trace_chunks + has_many :ci_semaphores, class_name: 'Ci::ProjectSemaphore' has_many :job_artifacts, class_name: 'Ci::JobArtifact' has_many :runner_projects, class_name: 'Ci::RunnerProject', inverse_of: :project has_many :runners, through: :runner_projects, source: :runner, class_name: 'Ci::Runner' diff --git a/app/services/ci/process_build_service.rb b/app/services/ci/process_build_service.rb index eb92c7d1a271dcac32988acec813136b9287dff0..f5a00ea63ba9d8f42705d43a420916b8f1a0bcb6 100644 --- a/app/services/ci/process_build_service.rb +++ b/app/services/ci/process_build_service.rb @@ -4,7 +4,9 @@ module Ci class ProcessBuildService < BaseService def execute(build, current_status) if valid_statuses_for_when(build.when).include?(current_status) - if build.schedulable? + if build.lockable? + build.try_lock + elsif build.schedulable? build.schedule elsif build.action? build.actionize diff --git a/db/migrate/20191120184725_ci_project_semaphores.rb b/db/migrate/20191120184725_ci_project_semaphores.rb new file mode 100644 index 0000000000000000000000000000000000000000..40c6392fc7d20e07efd30e5ddd387c046769f6cc --- /dev/null +++ b/db/migrate/20191120184725_ci_project_semaphores.rb @@ -0,0 +1,24 @@ +# frozen_string_literal: true + +class CiProjectSemaphores < ActiveRecord::Migration[5.2] + DOWNTIME = false + + def change + create_table :ci_project_semaphores do |t| + t.references :project, null: false, index: false, foreign_key: { on_delete: :cascade } + t.string :key, null: false + t.integer :concurrency, null: false, default: 1 + t.index %i[project_id key], unique: true + t.timestamps_with_timezone + end + + create_table :ci_job_locks do |t| + t.references :semaphore, null: false, index: false, foreign_key: { to_table: :ci_project_semaphores, on_delete: :cascade } + t.references :job, null: false, index: false, foreign_key: { to_table: :ci_builds, on_delete: :cascade } + t.integer :status, limit: 2, null: false + t.integer :blocked_duration + t.index %i[semaphore_id job_id], unique: true + t.timestamps_with_timezone + end + end +end diff --git a/db/schema.rb b/db/schema.rb index 21d045be380c2e8cdb8134dbbb0f026f34e33b48..7c12ac5c48db895e3e7ccb0d184e7c596845ddce 100644 --- a/db/schema.rb +++ b/db/schema.rb @@ -10,7 +10,7 @@ # # It's strongly recommended that you check this file into your version control system. -ActiveRecord::Schema.define(version: 2019_11_18_182722) do +ActiveRecord::Schema.define(version: 2019_11_20_184725) do # These are extensions that must be enabled in order to support this database enable_extension "pg_trgm" @@ -755,6 +755,16 @@ t.index ["project_id"], name: "index_ci_job_artifacts_on_project_id_for_security_reports", where: "(file_type = ANY (ARRAY[5, 6, 7, 8]))" end + create_table "ci_job_locks", force: :cascade do |t| + t.bigint "semaphore_id", null: false + t.bigint "job_id", null: false + t.integer "status", limit: 2, null: false + t.integer "blocked_duration" + t.datetime_with_timezone "created_at", null: false + t.datetime_with_timezone "updated_at", null: false + t.index ["semaphore_id", "job_id"], name: "index_ci_job_locks_on_semaphore_id_and_job_id", unique: true + end + create_table "ci_job_variables", force: :cascade do |t| t.string "key", null: false t.text "encrypted_value" @@ -855,6 +865,15 @@ t.index ["user_id"], name: "index_ci_pipelines_on_user_id" end + create_table "ci_project_semaphores", force: :cascade do |t| + t.bigint "project_id", null: false + t.string "key", null: false + t.integer "concurrency", default: 1, null: false + t.datetime_with_timezone "created_at", null: false + t.datetime_with_timezone "updated_at", null: false + t.index ["project_id", "key"], name: "index_ci_project_semaphores_on_project_id_and_key", unique: true + end + create_table "ci_runner_namespaces", id: :serial, force: :cascade do |t| t.integer "runner_id" t.integer "namespace_id" @@ -4278,6 +4297,8 @@ add_foreign_key "ci_group_variables", "namespaces", column: "group_id", name: "fk_33ae4d58d8", on_delete: :cascade add_foreign_key "ci_job_artifacts", "ci_builds", column: "job_id", on_delete: :cascade add_foreign_key "ci_job_artifacts", "projects", on_delete: :cascade + add_foreign_key "ci_job_locks", "ci_builds", column: "job_id", on_delete: :cascade + add_foreign_key "ci_job_locks", "ci_project_semaphores", column: "semaphore_id", on_delete: :cascade add_foreign_key "ci_job_variables", "ci_builds", column: "job_id", on_delete: :cascade add_foreign_key "ci_pipeline_chat_data", "chat_names", on_delete: :cascade add_foreign_key "ci_pipeline_chat_data", "ci_pipelines", column: "pipeline_id", on_delete: :cascade @@ -4290,6 +4311,7 @@ add_foreign_key "ci_pipelines", "external_pull_requests", name: "fk_190998ef09", on_delete: :nullify add_foreign_key "ci_pipelines", "merge_requests", name: "fk_a23be95014", on_delete: :cascade add_foreign_key "ci_pipelines", "projects", name: "fk_86635dbd80", on_delete: :cascade + add_foreign_key "ci_project_semaphores", "projects", on_delete: :cascade add_foreign_key "ci_runner_namespaces", "ci_runners", column: "runner_id", on_delete: :cascade add_foreign_key "ci_runner_namespaces", "namespaces", on_delete: :cascade add_foreign_key "ci_runner_projects", "projects", name: "fk_4478a6f1e4", on_delete: :cascade diff --git a/lib/gitlab/ci/config/entry/job.rb b/lib/gitlab/ci/config/entry/job.rb index c75ae87a98555bcb22ed446fb586f4918df8d148..7674479e710b5679294a7bf4d1ff530f859aa140 100644 --- a/lib/gitlab/ci/config/entry/job.rb +++ b/lib/gitlab/ci/config/entry/job.rb @@ -16,7 +16,8 @@ class Job < ::Gitlab::Config::Entry::Node ALLOWED_KEYS = %i[tags script only except rules type image services allow_failure type stage when start_in artifacts cache dependencies before_script needs after_script variables - environment coverage retry parallel extends interruptible timeout].freeze + environment coverage retry parallel extends interruptible timeout + lock].freeze REQUIRED_BY_NEEDS = %i[stage].freeze @@ -51,6 +52,7 @@ class Job < ::Gitlab::Config::Entry::Node validates :dependencies, array_of_strings: true validates :extends, array_of_strings_or_string: true validates :rules, array_of_hashes: true + validates :lock, type: String end validates :start_in, duration: { limit: '1 day' }, if: :delayed? @@ -151,7 +153,7 @@ class Job < ::Gitlab::Config::Entry::Node attributes :script, :tags, :allow_failure, :when, :dependencies, :needs, :retry, :parallel, :extends, :start_in, :rules, - :interruptible, :timeout + :interruptible, :timeout, :lock def self.matching?(name, config) !name.to_s.start_with?('.') && @@ -231,7 +233,8 @@ def to_hash artifacts: artifacts_value, after_script: after_script_value, ignore: ignored?, - needs: needs_defined? ? needs_value : nil } + needs: needs_defined? ? needs_value : nil, + lock: lock } end end end diff --git a/lib/gitlab/ci/pipeline/seed/build.rb b/lib/gitlab/ci/pipeline/seed/build.rb index dce56b226664982068a0ef546111611350752dd6..6fc8235a91a39da6058278bd42d1bfab97657c17 100644 --- a/lib/gitlab/ci/pipeline/seed/build.rb +++ b/lib/gitlab/ci/pipeline/seed/build.rb @@ -78,6 +78,7 @@ def to_resource else ::Ci::Build.new(attributes).tap do |job| job.deployment = Seed::Deployment.new(job).to_resource + job.job_lock = Seed::Build::Lock.new(job).to_resource end end end diff --git a/lib/gitlab/ci/pipeline/seed/build/lock.rb b/lib/gitlab/ci/pipeline/seed/build/lock.rb new file mode 100644 index 0000000000000000000000000000000000000000..88e7d8352dfb000559b686dc49d133e28f5bd554 --- /dev/null +++ b/lib/gitlab/ci/pipeline/seed/build/lock.rb @@ -0,0 +1,46 @@ +# frozen_string_literal: true + +module Gitlab + module Ci + module Pipeline + module Seed + class Build + class Lock < Seed::Base + attr_reader :job + + def initialize(job) + @job = job + end + + def to_resource + return unless job.has_lock? + + semaphore = find_or_create_semaphore + + unless semaphore.valid? && semaphore.persisted? + # TODO: Gitlab::Sentry.track_exception or invalid parameters + return + end + + semaphore.job_locks.build(job: job) + end + + private + + def find_or_create_semaphore + # TODO: Gitlab::OptimisticLocking to avoid race condition + ::Ci::ProjectSemaphore.find_or_create_by(attributes_for_semaphore) + end + + def attributes_for_semaphore + { + project: job.project, + key: job.expanded_lock_key + } + end + end + end + end + end + end +end diff --git a/lib/gitlab/ci/status/build/blocked_by_semaphore.rb b/lib/gitlab/ci/status/build/blocked_by_semaphore.rb new file mode 100644 index 0000000000000000000000000000000000000000..1a69c517d8198ec2224e4964eea5b97eaf2ce5b4 --- /dev/null +++ b/lib/gitlab/ci/status/build/blocked_by_semaphore.rb @@ -0,0 +1,31 @@ +# frozen_string_literal: true + +module Gitlab + module Ci + module Status + module Build + class BlockedBySemaphore < Status::Extended + def text + s_('CiStatusText|blocked by semaphore') + end + + def label + s_('CiStatusLabel|blocked by semaphore') + end + + def icon + 'status_manual' + end + + def group + 'favicon_status_manual' + end + + def self.matches?(subject, user) + subject.job_lock&.blocked? # TODO: Hyper slow. User compond status. + end + end + end + end + end +end diff --git a/lib/gitlab/ci/status/build/factory.rb b/lib/gitlab/ci/status/build/factory.rb index 96d058428385310d4bb6ce70f7200ceaef8e52cb..b181a07cca23937dc2ee58d1285b6baea7d5cb72 100644 --- a/lib/gitlab/ci/status/build/factory.rb +++ b/lib/gitlab/ci/status/build/factory.rb @@ -8,6 +8,7 @@ class Factory < Status::Factory def self.extended_statuses [[Status::Build::Erased, Status::Build::Scheduled, + Status::Build::BlockedBySemaphore, Status::Build::Manual, Status::Build::Canceled, Status::Build::Created, diff --git a/lib/gitlab/ci/status/pipeline/blocked_by_semaphore.rb b/lib/gitlab/ci/status/pipeline/blocked_by_semaphore.rb new file mode 100644 index 0000000000000000000000000000000000000000..a7acded59291014ea77531cc68b9551df05b963d --- /dev/null +++ b/lib/gitlab/ci/status/pipeline/blocked_by_semaphore.rb @@ -0,0 +1,31 @@ +# frozen_string_literal: true + +module Gitlab + module Ci + module Status + module Pipeline + class BlockedBySemaphore < Status::Extended + def text + s_('CiStatusText|blocked by semaphore') + end + + def label + s_('CiStatusLabel|blocked by semaphore') + end + + def icon + 'status_manual' + end + + def group + 'favicon_status_manual' + end + + def self.matches?(subject, user) + subject.builds.any? { |build| build.job_lock&.blocked? } # TODO: Hyper slow. User compond status. + end + end + end + end + end +end diff --git a/lib/gitlab/ci/status/pipeline/factory.rb b/lib/gitlab/ci/status/pipeline/factory.rb index 5d1a8bbd9240801c572627b6d963d8e7aa57c954..37cc553204e2fe37cce956c6b4a5674217208636 100644 --- a/lib/gitlab/ci/status/pipeline/factory.rb +++ b/lib/gitlab/ci/status/pipeline/factory.rb @@ -8,7 +8,8 @@ class Factory < Status::Factory def self.extended_statuses [[Status::SuccessWarning, Status::Pipeline::Delayed, - Status::Pipeline::Blocked]] + Status::Pipeline::Blocked, + Status::Pipeline::BlockedBySemaphore]] end def self.common_helpers diff --git a/lib/gitlab/ci/status/stage/blocked_by_semaphore.rb b/lib/gitlab/ci/status/stage/blocked_by_semaphore.rb new file mode 100644 index 0000000000000000000000000000000000000000..1d55daf0da60617d9706ca67411edbb81a01fe35 --- /dev/null +++ b/lib/gitlab/ci/status/stage/blocked_by_semaphore.rb @@ -0,0 +1,31 @@ +# frozen_string_literal: true + +module Gitlab + module Ci + module Status + module Stage + class BlockedBySemaphore < Status::Extended + def text + s_('CiStatusText|blocked by semaphore') + end + + def label + s_('CiStatusLabel|blocked by semaphore') + end + + def icon + 'status_manual' + end + + def group + 'favicon_status_manual' + end + + def self.matches?(subject, user) + subject.builds.any? { |build| build.job_lock&.blocked? } # TODO: Hyper slow. User compond status. + end + end + end + end + end +end diff --git a/lib/gitlab/ci/status/stage/factory.rb b/lib/gitlab/ci/status/stage/factory.rb index e50b08537255c663d6332ea6bfd38f99278a2d04..44b4547a57c3d0d2c3ca010e61afd46f7d89ffce 100644 --- a/lib/gitlab/ci/status/stage/factory.rb +++ b/lib/gitlab/ci/status/stage/factory.rb @@ -7,7 +7,8 @@ module Stage class Factory < Status::Factory def self.extended_statuses [[Status::SuccessWarning], - [Status::Stage::PlayManual]] + [Status::Stage::PlayManual], + [Status::Stage::BlockedBySemaphore]] end def self.common_helpers diff --git a/lib/gitlab/ci/yaml_processor.rb b/lib/gitlab/ci/yaml_processor.rb index 833c545fc5b1313bf1b6ff895ee36458a49b54f7..8a167bf30c0c73a00bdb12af50d705e409f45927 100644 --- a/lib/gitlab/ci/yaml_processor.rb +++ b/lib/gitlab/ci/yaml_processor.rb @@ -59,7 +59,8 @@ def build_attributes(name) instance: job[:instance], start_in: job[:start_in], trigger: job[:trigger], - bridge_needs: job.dig(:needs, :bridge)&.first + bridge_needs: job.dig(:needs, :bridge)&.first, + lock: job[:lock], }.compact }.compact end diff --git a/spec/services/ci/create_pipeline_service_spec.rb b/spec/services/ci/create_pipeline_service_spec.rb index de0f484121524078be8b5417917b2415f2b5bc2d..03b1e05d016a750de3a3829d3a22b2159f732e75 100644 --- a/spec/services/ci/create_pipeline_service_spec.rb +++ b/spec/services/ci/create_pipeline_service_spec.rb @@ -885,6 +885,162 @@ def previous_commit_sha_from_ref(ref) end end + context 'with lock', :sidekiq_inline do + context 'when Limit per job' do + before do + config = YAML.dump( + test: { stage: 'test', script: 'ls' }, + deploy: { stage: 'deploy', script: 'ls', lock: 'tmp-key' } + ) + + stub_ci_pipeline_yaml_file(config) + end + + it 'has the deploy job with locking status' do + result = execute_service + test_job = result.builds.find_by_name!(:test) + deploy_job = result.builds.find_by_name!(:deploy) + + expect(deploy_job).to be_created + expect(deploy_job.job_lock).to be_created + + # when test job finished + test_job.success! + deploy_job.reload + + expect(deploy_job).to be_pending + expect(deploy_job.job_lock).to be_locking + + # when deploy job finished + deploy_job.success! + deploy_job.reload + + expect(deploy_job).to be_success + expect(deploy_job.job_lock).to be_released + end + + context 'when the other job has already obtained the lock' do + let(:other_job) { create(:ci_build, name: 'other') } + + before do + ci_semaphore = project.ci_semaphores.create!(key: 'tmp-key') + ci_semaphore.job_locks.create!(job: other_job).obtain! + other_job.reload + end + + it 'has the deploy job with blocked status' do + result = execute_service + test_job = result.builds.find_by_name!(:test) + deploy_job = result.builds.find_by_name!(:deploy) + + expect(deploy_job).to be_created + expect(deploy_job.job_lock).to be_created + + # when test job finished + test_job.success! + deploy_job.reload + + expect(deploy_job).to be_created + expect(deploy_job.job_lock).to be_blocked + + # when other job finished + other_job.success! + other_job.reload + deploy_job.reload + + expect(other_job).to be_success + expect(other_job.job_lock).to be_released + expect(deploy_job).to be_pending + expect(deploy_job.job_lock).to be_locking + + # when deploy job finished + deploy_job.success! + deploy_job.reload + + expect(deploy_job).to be_success + expect(deploy_job.job_lock).to be_released + end + end + end + + context 'when Limit per job' do + before do + config = YAML.dump( + test: { stage: 'test', script: 'ls' }, + deploy: { stage: 'deploy', script: 'ls', lock: '$CI_JOB_NAME', environment: 'prd' } + ) + + stub_ci_pipeline_yaml_file(config) + end + + it 'persists the association correctly' do + result = execute_service + test_job = result.builds.find_by_name!(:test) + deploy_job = result.builds.find_by_name!(:deploy) + project_semaphore = project.ci_semaphores.find_by_key!('deploy') + + expect(result).to be_persisted + expect(test_job).not_to be_has_lock + expect(deploy_job).to be_has_lock + expect(deploy_job.lock_key).to eq('$CI_JOB_NAME') + expect(project.ci_semaphores.count).to eq(1) + expect(project_semaphore.job_locks.count).to eq(1) + expect(test_job.job_lock).not_to be_present + expect(deploy_job.job_lock).to be_present + expect(deploy_job.job_lock.ci_semaphore).to eq(project_semaphore) + end + end + + context 'when Limit per job per branch' do + before do + config = YAML.dump( + test: { + script: 'ls', + lock: '$CI_COMMIT_REF_NAME:$CI_JOB_NAME' + } + ) + + stub_ci_pipeline_yaml_file(config) + end + + it 'persists lock attribute to build option' do + result = execute_service + test_job = result.builds.find_by_name!(:test) + + expect(result).to be_persisted + expect(test_job).to be_lockable + expect(project.ci_semaphores.exists?(key: 'master:test')).to eq(true) + end + end + + context 'when Limit per environment' do + before do + config = YAML.dump( + test: { + script: 'ls', + environment: 'prd', + lock: '$CI_ENVIRONMENT_NAME' # TODO: This needs to fetch `persisted_environment_variables` in `simple_variables` + } + ) + + stub_ci_pipeline_yaml_file(config) + end + + it 'persists lock attribute to build option' do + result = execute_service + test_job = result.builds.find_by_name!(:test) + + expect(result).to be_persisted + expect(test_job).to be_lockable + expect(project.ci_semaphores.exist?(key: 'prd')).to eq(true) + end + end + + context 'when two same locks exist in the same pipeline' do + # TODO: Test edge case + end + end + shared_examples 'when ref is protected' do let(:user) { create(:user) }