summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGrzegorz Bizon <grzesiek.bizon@gmail.com>2016-10-21 09:44:35 +0200
committerGrzegorz Bizon <grzesiek.bizon@gmail.com>2016-10-21 09:44:35 +0200
commit1dc34b714cc18264e73a8e4d722b067d155d8ab1 (patch)
tree2310448800d21ce631cbf67d744352fdd6a9fbcd
parent04f731f3ada7c8ff7832275f6838ea687c7f6bbe (diff)
downloadgitlab-ce-fix/improve-concurrent-pipeline-processing.tar.gz
Schedule pipeline worker only when it is uniquefix/improve-concurrent-pipeline-processing
-rw-r--r--app/models/commit_status.rb7
-rw-r--r--app/workers/pipeline_process_worker.rb9
-rw-r--r--app/workers/pipeline_update_worker.rb9
-rw-r--r--lib/gitlab/worker/unique.rb35
4 files changed, 27 insertions, 33 deletions
diff --git a/app/models/commit_status.rb b/app/models/commit_status.rb
index 7b554be4f9a..4b6b996a462 100644
--- a/app/models/commit_status.rb
+++ b/app/models/commit_status.rb
@@ -90,11 +90,8 @@ class CommitStatus < ActiveRecord::Base
commit_status.run_after_commit do
pipeline.try do |pipeline|
- if complete?
- PipelineProcessWorker.perform_async(pipeline.id)
- else
- PipelineUpdateWorker.perform_async(pipeline.id)
- end
+ worker = complete? ? PipelineProcessWorker : PipelineUpdateWorker
+ Gitlab::Worker::Unique.new(worker, pipeline.id).schedule!
end
end
end
diff --git a/app/workers/pipeline_process_worker.rb b/app/workers/pipeline_process_worker.rb
index 69d2e7cf010..b89743272f7 100644
--- a/app/workers/pipeline_process_worker.rb
+++ b/app/workers/pipeline_process_worker.rb
@@ -1,13 +1,10 @@
class PipelineProcessWorker
include Sidekiq::Worker
- include Gitlab::Worker::Unique
-
sidekiq_options queue: :default
def perform(pipeline_id)
- unique_processing(pipeline_id) do
- Ci::Pipeline.find_by(id: pipeline_id)
- .try(:process!)
- end
+ Gitlab::Worker::Unique.new(self.class, pipeline_id).release!
+
+ Ci::Pipeline.find_by(id: pipeline_id).try(:process!)
end
end
diff --git a/app/workers/pipeline_update_worker.rb b/app/workers/pipeline_update_worker.rb
index d2d22195067..e135e230cee 100644
--- a/app/workers/pipeline_update_worker.rb
+++ b/app/workers/pipeline_update_worker.rb
@@ -1,13 +1,10 @@
class PipelineUpdateWorker
include Sidekiq::Worker
- include Gitlab::Worker::Unique
-
sidekiq_options queue: :default
def perform(pipeline_id)
- unique_processing(pipeline_id) do
- Ci::Pipeline.find_by(id: pipeline_id)
- .try(:update_status)
- end
+ Gitlab::Worker::Unique.new(self.class, pipeline_id).release!
+
+ Ci::Pipeline.find_by(id: pipeline_id).try(:update_status)
end
end
diff --git a/lib/gitlab/worker/unique.rb b/lib/gitlab/worker/unique.rb
index 708d6eb2e05..9adbdaa72f4 100644
--- a/lib/gitlab/worker/unique.rb
+++ b/lib/gitlab/worker/unique.rb
@@ -2,27 +2,30 @@ require 'digest'
module Gitlab
module Worker
- module Unique
- def unique_processing(*args)
- key, timeout = uuid(args), 1.hour.to_i
+ class Unique
+ def initialize(worker, *args)
+ @worker = worker
+ @args = args
+ end
- Gitlab::ExclusiveLease.new(key, timeout: timeout).tap do |lease|
- break unless lease.try_obtain
+ def uuid
+ @uuid ||= Digest::SHA1
+ .hexdigest(@worker.name + @args.to_json)
+ end
- begin
- yield
- rescue
- raise
- ensure
- lease.cancel!
- end
- end
+ def lease
+ @lease ||= Gitlab::ExclusiveLease
+ .new(uuid, timeout: 1.hour.to_i)
end
- private
+ def schedule!
+ if lease.try_obtain
+ @worker.perform_async(*@args)
+ end
+ end
- def uuid(args)
- Digest::SHA1.hexdigest(self.class.name + args.to_json)
+ def release!
+ lease.cancel!
end
end
end