diff options
Diffstat (limited to 'lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb')
-rw-r--r-- | lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb | 73 |
1 files changed, 39 insertions, 34 deletions
diff --git a/lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb b/lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb index aeb58d7c153..e63164efc94 100644 --- a/lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb +++ b/lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb @@ -64,9 +64,9 @@ module Gitlab Sidekiq.redis do |redis| redis.multi do |multi| - redis.set(idempotency_key, jid, ex: expiry, nx: true) - read_wal_locations = check_existing_wal_locations!(redis, expiry) - read_jid = redis.get(idempotency_key) + multi.set(idempotency_key, jid, ex: expiry, nx: true) + read_wal_locations = check_existing_wal_locations!(multi, expiry) + read_jid = multi.get(idempotency_key) end end @@ -81,9 +81,9 @@ module Gitlab return unless job_wal_locations.present? Sidekiq.redis do |redis| - redis.multi do + redis.multi do |multi| job_wal_locations.each do |connection_name, location| - redis.eval(LUA_SET_WAL_SCRIPT, keys: [wal_location_key(connection_name)], argv: [location, pg_wal_lsn_diff(connection_name).to_i, WAL_LOCATION_TTL]) + multi.eval(LUA_SET_WAL_SCRIPT, keys: [wal_location_key(connection_name)], argv: [location, pg_wal_lsn_diff(connection_name).to_i, WAL_LOCATION_TTL]) end end end @@ -96,9 +96,9 @@ module Gitlab read_wal_locations = {} Sidekiq.redis do |redis| - redis.multi do + redis.multi do |multi| job_wal_locations.keys.each do |connection_name| - read_wal_locations[connection_name] = redis.lindex(wal_location_key(connection_name), 0) + read_wal_locations[connection_name] = multi.lindex(wal_location_key(connection_name), 0) end end end @@ -110,8 +110,8 @@ module Gitlab def delete! Sidekiq.redis do |redis| redis.multi do |multi| - redis.del(idempotency_key) - delete_wal_locations!(redis) + multi.del(idempotency_key) + delete_wal_locations!(multi) end end end @@ -140,13 +140,14 @@ module Gitlab def idempotent? return false unless worker_klass return false unless worker_klass.respond_to?(:idempotent?) + return false unless preserve_wal_location? || !worker_klass.utilizes_load_balancing_capabilities? worker_klass.idempotent? end private - attr_accessor :existing_wal_locations + attr_writer :existing_wal_locations attr_reader :queue_name, :job attr_writer :existing_jid @@ -154,8 +155,33 @@ module Gitlab @worker_klass ||= worker_class_name.to_s.safe_constantize end + def delete_wal_locations!(redis) + job_wal_locations.keys.each do |connection_name| + redis.del(wal_location_key(connection_name)) + redis.del(existing_wal_location_key(connection_name)) + end + end + + def check_existing_wal_locations!(redis, expiry) + read_wal_locations = {} + + job_wal_locations.each do |connection_name, location| + key = existing_wal_location_key(connection_name) + redis.set(key, location, ex: expiry, nx: true) + read_wal_locations[connection_name] = redis.get(key) + end + + read_wal_locations + end + + def job_wal_locations + return {} unless preserve_wal_location? + + job['wal_locations'] || {} + end + def pg_wal_lsn_diff(connection_name) - Gitlab::Database::DATABASES[connection_name].pg_wal_lsn_diff(job_wal_locations[connection_name], existing_wal_locations[connection_name]) + Gitlab::Database.databases[connection_name].pg_wal_lsn_diff(job_wal_locations[connection_name], existing_wal_locations[connection_name]) end def strategy @@ -178,12 +204,6 @@ module Gitlab job['jid'] end - def job_wal_locations - return {} unless preserve_wal_location? - - job['wal_locations'] || {} - end - def existing_wal_location_key(connection_name) "#{idempotency_key}:#{connection_name}:existing_wal_location" end @@ -208,23 +228,8 @@ module Gitlab "#{worker_class_name}:#{Sidekiq.dump_json(arguments)}" end - def delete_wal_locations!(redis) - job_wal_locations.keys.each do |connection_name| - redis.del(wal_location_key(connection_name)) - redis.del(existing_wal_location_key(connection_name)) - end - end - - def check_existing_wal_locations!(redis, expiry) - read_wal_locations = {} - - job_wal_locations.each do |connection_name, location| - key = existing_wal_location_key(connection_name) - redis.set(key, location, ex: expiry, nx: true) - read_wal_locations[connection_name] = redis.get(key) - end - - read_wal_locations + def existing_wal_locations + @existing_wal_locations ||= {} end def preserve_wal_location? |