summaryrefslogtreecommitdiff
path: root/lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb
diff options
context:
space:
mode:
Diffstat (limited to 'lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb')
-rw-r--r--lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb73
1 files changed, 39 insertions, 34 deletions
diff --git a/lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb b/lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb
index aeb58d7c153..e63164efc94 100644
--- a/lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb
+++ b/lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb
@@ -64,9 +64,9 @@ module Gitlab
Sidekiq.redis do |redis|
redis.multi do |multi|
- redis.set(idempotency_key, jid, ex: expiry, nx: true)
- read_wal_locations = check_existing_wal_locations!(redis, expiry)
- read_jid = redis.get(idempotency_key)
+ multi.set(idempotency_key, jid, ex: expiry, nx: true)
+ read_wal_locations = check_existing_wal_locations!(multi, expiry)
+ read_jid = multi.get(idempotency_key)
end
end
@@ -81,9 +81,9 @@ module Gitlab
return unless job_wal_locations.present?
Sidekiq.redis do |redis|
- redis.multi do
+ redis.multi do |multi|
job_wal_locations.each do |connection_name, location|
- redis.eval(LUA_SET_WAL_SCRIPT, keys: [wal_location_key(connection_name)], argv: [location, pg_wal_lsn_diff(connection_name).to_i, WAL_LOCATION_TTL])
+ multi.eval(LUA_SET_WAL_SCRIPT, keys: [wal_location_key(connection_name)], argv: [location, pg_wal_lsn_diff(connection_name).to_i, WAL_LOCATION_TTL])
end
end
end
@@ -96,9 +96,9 @@ module Gitlab
read_wal_locations = {}
Sidekiq.redis do |redis|
- redis.multi do
+ redis.multi do |multi|
job_wal_locations.keys.each do |connection_name|
- read_wal_locations[connection_name] = redis.lindex(wal_location_key(connection_name), 0)
+ read_wal_locations[connection_name] = multi.lindex(wal_location_key(connection_name), 0)
end
end
end
@@ -110,8 +110,8 @@ module Gitlab
def delete!
Sidekiq.redis do |redis|
redis.multi do |multi|
- redis.del(idempotency_key)
- delete_wal_locations!(redis)
+ multi.del(idempotency_key)
+ delete_wal_locations!(multi)
end
end
end
@@ -140,13 +140,14 @@ module Gitlab
def idempotent?
return false unless worker_klass
return false unless worker_klass.respond_to?(:idempotent?)
+ return false unless preserve_wal_location? || !worker_klass.utilizes_load_balancing_capabilities?
worker_klass.idempotent?
end
private
- attr_accessor :existing_wal_locations
+ attr_writer :existing_wal_locations
attr_reader :queue_name, :job
attr_writer :existing_jid
@@ -154,8 +155,33 @@ module Gitlab
@worker_klass ||= worker_class_name.to_s.safe_constantize
end
+ def delete_wal_locations!(redis)
+ job_wal_locations.keys.each do |connection_name|
+ redis.del(wal_location_key(connection_name))
+ redis.del(existing_wal_location_key(connection_name))
+ end
+ end
+
+ def check_existing_wal_locations!(redis, expiry)
+ read_wal_locations = {}
+
+ job_wal_locations.each do |connection_name, location|
+ key = existing_wal_location_key(connection_name)
+ redis.set(key, location, ex: expiry, nx: true)
+ read_wal_locations[connection_name] = redis.get(key)
+ end
+
+ read_wal_locations
+ end
+
+ def job_wal_locations
+ return {} unless preserve_wal_location?
+
+ job['wal_locations'] || {}
+ end
+
def pg_wal_lsn_diff(connection_name)
- Gitlab::Database::DATABASES[connection_name].pg_wal_lsn_diff(job_wal_locations[connection_name], existing_wal_locations[connection_name])
+ Gitlab::Database.databases[connection_name].pg_wal_lsn_diff(job_wal_locations[connection_name], existing_wal_locations[connection_name])
end
def strategy
@@ -178,12 +204,6 @@ module Gitlab
job['jid']
end
- def job_wal_locations
- return {} unless preserve_wal_location?
-
- job['wal_locations'] || {}
- end
-
def existing_wal_location_key(connection_name)
"#{idempotency_key}:#{connection_name}:existing_wal_location"
end
@@ -208,23 +228,8 @@ module Gitlab
"#{worker_class_name}:#{Sidekiq.dump_json(arguments)}"
end
- def delete_wal_locations!(redis)
- job_wal_locations.keys.each do |connection_name|
- redis.del(wal_location_key(connection_name))
- redis.del(existing_wal_location_key(connection_name))
- end
- end
-
- def check_existing_wal_locations!(redis, expiry)
- read_wal_locations = {}
-
- job_wal_locations.each do |connection_name, location|
- key = existing_wal_location_key(connection_name)
- redis.set(key, location, ex: expiry, nx: true)
- read_wal_locations[connection_name] = redis.get(key)
- end
-
- read_wal_locations
+ def existing_wal_locations
+ @existing_wal_locations ||= {}
end
def preserve_wal_location?