From eeec64d83b621d42a17e7a7207049fdc58c3519a Mon Sep 17 00:00:00 2001 From: Grzegorz Bizon Date: Fri, 21 Oct 2016 08:21:47 +0200 Subject: [PATCH 1/3] Add mixin for unique sidekiq workers using lease --- lib/gitlab/exclusive_lease.rb | 7 ++++++- lib/gitlab/worker/unique.rb | 29 +++++++++++++++++++++++++++++ 2 files changed, 35 insertions(+), 1 deletion(-) create mode 100644 lib/gitlab/worker/unique.rb diff --git a/lib/gitlab/exclusive_lease.rb b/lib/gitlab/exclusive_lease.rb index ffe49364379b..3329f3fc1bdc 100644 --- a/lib/gitlab/exclusive_lease.rb +++ b/lib/gitlab/exclusive_lease.rb @@ -27,7 +27,7 @@ module Gitlab # on begin/ensure blocks to cancel a lease, because the 'ensure' does # not always run. Think of 'kill -9' from the Unicorn master for # instance. - # + # # If you find that leases are getting in your way, ask yourself: would # it be enough to lower the lease timeout? Another thing that might be # appropriate is to only use a lease for bulk/automated operations, and @@ -49,6 +49,11 @@ def try_obtain end # No #cancel method. See comments above! + # TODO, consider adding this + # + def cancel! + Gitlab::Redis.with { |redis| redis.del(redis_key) } + end private diff --git a/lib/gitlab/worker/unique.rb b/lib/gitlab/worker/unique.rb new file mode 100644 index 000000000000..708d6eb2e05d --- /dev/null +++ b/lib/gitlab/worker/unique.rb @@ -0,0 +1,29 @@ +require 'digest' + +module Gitlab + module Worker + module Unique + def unique_processing(*args) + key, timeout = uuid(args), 1.hour.to_i + + Gitlab::ExclusiveLease.new(key, timeout: timeout).tap do |lease| + break unless lease.try_obtain + + begin + yield + rescue + raise + ensure + lease.cancel! + end + end + end + + private + + def uuid(args) + Digest::SHA1.hexdigest(self.class.name + args.to_json) + end + end + end +end -- GitLab From 04f731f3ada7c8ff7832275f6838ea687c7f6bbe Mon Sep 17 00:00:00 2001 From: Grzegorz Bizon Date: Fri, 21 Oct 2016 08:43:36 +0200 Subject: [PATCH 2/3] Add unique pipeline processing rules to workers --- app/workers/pipeline_process_worker.rb | 7 +++++-- app/workers/pipeline_update_worker.rb | 7 +++++-- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/app/workers/pipeline_process_worker.rb b/app/workers/pipeline_process_worker.rb index f44227d7086b..69d2e7cf0105 100644 --- a/app/workers/pipeline_process_worker.rb +++ b/app/workers/pipeline_process_worker.rb @@ -1,10 +1,13 @@ class PipelineProcessWorker include Sidekiq::Worker + include Gitlab::Worker::Unique sidekiq_options queue: :default def perform(pipeline_id) - Ci::Pipeline.find_by(id: pipeline_id) - .try(:process!) + unique_processing(pipeline_id) do + Ci::Pipeline.find_by(id: pipeline_id) + .try(:process!) + end end end diff --git a/app/workers/pipeline_update_worker.rb b/app/workers/pipeline_update_worker.rb index 44a7f24e4013..d2d221950679 100644 --- a/app/workers/pipeline_update_worker.rb +++ b/app/workers/pipeline_update_worker.rb @@ -1,10 +1,13 @@ class PipelineUpdateWorker include Sidekiq::Worker + include Gitlab::Worker::Unique sidekiq_options queue: :default def perform(pipeline_id) - Ci::Pipeline.find_by(id: pipeline_id) - .try(:update_status) + unique_processing(pipeline_id) do + Ci::Pipeline.find_by(id: pipeline_id) + .try(:update_status) + end end end -- GitLab From 1dc34b714cc18264e73a8e4d722b067d155d8ab1 Mon Sep 17 00:00:00 2001 From: Grzegorz Bizon Date: Fri, 21 Oct 2016 09:44:35 +0200 Subject: [PATCH 3/3] Schedule pipeline worker only when it is unique --- app/models/commit_status.rb | 7 ++---- app/workers/pipeline_process_worker.rb | 9 +++---- app/workers/pipeline_update_worker.rb | 9 +++---- lib/gitlab/worker/unique.rb | 35 ++++++++++++++------------ 4 files changed, 27 insertions(+), 33 deletions(-) diff --git a/app/models/commit_status.rb b/app/models/commit_status.rb index 7b554be4f9a5..4b6b996a4621 100644 --- a/app/models/commit_status.rb +++ b/app/models/commit_status.rb @@ -90,11 +90,8 @@ class CommitStatus < ActiveRecord::Base commit_status.run_after_commit do pipeline.try do |pipeline| - if complete? - PipelineProcessWorker.perform_async(pipeline.id) - else - PipelineUpdateWorker.perform_async(pipeline.id) - end + worker = complete? ? PipelineProcessWorker : PipelineUpdateWorker + Gitlab::Worker::Unique.new(worker, pipeline.id).schedule! end end end diff --git a/app/workers/pipeline_process_worker.rb b/app/workers/pipeline_process_worker.rb index 69d2e7cf0105..b89743272f7e 100644 --- a/app/workers/pipeline_process_worker.rb +++ b/app/workers/pipeline_process_worker.rb @@ -1,13 +1,10 @@ class PipelineProcessWorker include Sidekiq::Worker - include Gitlab::Worker::Unique - sidekiq_options queue: :default def perform(pipeline_id) - unique_processing(pipeline_id) do - Ci::Pipeline.find_by(id: pipeline_id) - .try(:process!) - end + Gitlab::Worker::Unique.new(self.class, pipeline_id).release! + + Ci::Pipeline.find_by(id: pipeline_id).try(:process!) end end diff --git a/app/workers/pipeline_update_worker.rb b/app/workers/pipeline_update_worker.rb index d2d221950679..e135e230cee8 100644 --- a/app/workers/pipeline_update_worker.rb +++ b/app/workers/pipeline_update_worker.rb @@ -1,13 +1,10 @@ class PipelineUpdateWorker include Sidekiq::Worker - include Gitlab::Worker::Unique - sidekiq_options queue: :default def perform(pipeline_id) - unique_processing(pipeline_id) do - Ci::Pipeline.find_by(id: pipeline_id) - .try(:update_status) - end + Gitlab::Worker::Unique.new(self.class, pipeline_id).release! + + Ci::Pipeline.find_by(id: pipeline_id).try(:update_status) end end diff --git a/lib/gitlab/worker/unique.rb b/lib/gitlab/worker/unique.rb index 708d6eb2e05d..9adbdaa72f45 100644 --- a/lib/gitlab/worker/unique.rb +++ b/lib/gitlab/worker/unique.rb @@ -2,27 +2,30 @@ module Gitlab module Worker - module Unique - def unique_processing(*args) - key, timeout = uuid(args), 1.hour.to_i + class Unique + def initialize(worker, *args) + @worker = worker + @args = args + end - Gitlab::ExclusiveLease.new(key, timeout: timeout).tap do |lease| - break unless lease.try_obtain + def uuid + @uuid ||= Digest::SHA1 + .hexdigest(@worker.name + @args.to_json) + end - begin - yield - rescue - raise - ensure - lease.cancel! - end - end + def lease + @lease ||= Gitlab::ExclusiveLease + .new(uuid, timeout: 1.hour.to_i) end - private + def schedule! + if lease.try_obtain + @worker.perform_async(*@args) + end + end - def uuid(args) - Digest::SHA1.hexdigest(self.class.name + args.to_json) + def release! + lease.cancel! end end end -- GitLab