summaryrefslogtreecommitdiff
path: root/lib/gitlab/sidekiq_enq.rb
diff options
context:
space:
mode:
Diffstat (limited to 'lib/gitlab/sidekiq_enq.rb')
-rw-r--r--lib/gitlab/sidekiq_enq.rb80
1 files changed, 70 insertions, 10 deletions
diff --git a/lib/gitlab/sidekiq_enq.rb b/lib/gitlab/sidekiq_enq.rb
index d8a01ac8ef4..de0c00fe561 100644
--- a/lib/gitlab/sidekiq_enq.rb
+++ b/lib/gitlab/sidekiq_enq.rb
@@ -1,16 +1,44 @@
# frozen_string_literal: true
-# This is a copy of https://github.com/mperham/sidekiq/blob/32c55e31659a1e6bd42f98334cca5eef2863de8d/lib/sidekiq/scheduled.rb#L11-L34
-#
-# It effectively reverts
-# https://github.com/mperham/sidekiq/commit/9b75467b33759888753191413eddbc15c37a219e
-# because we observe that the extra ZREMs caused by this change can lead to high
-# CPU usage on Redis at peak times:
-# https://gitlab.com/gitlab-com/gl-infra/scalability/-/issues/1179
-#
module Gitlab
class SidekiqEnq
+ LUA_ZPOPBYSCORE = <<~EOS
+ local key, now = KEYS[1], ARGV[1]
+ local jobs = redis.call("zrangebyscore", key, "-inf", now, "limit", 0, 1)
+ if jobs[1] then
+ redis.call("zrem", key, jobs[1])
+ return jobs[1]
+ end
+ EOS
+
+ LUA_ZPOPBYSCORE_SHA = Digest::SHA1.hexdigest(LUA_ZPOPBYSCORE)
+
def enqueue_jobs(now = Time.now.to_f.to_s, sorted_sets = Sidekiq::Scheduled::SETS)
+ Rails.application.reloader.wrap do
+ ::Gitlab::WithRequestStore.with_request_store do
+ if Feature.enabled?(:atomic_sidekiq_scheduler, default_enabled: :yaml)
+ atomic_find_jobs_and_enqueue(now, sorted_sets)
+ else
+ find_jobs_and_enqueue(now, sorted_sets)
+ end
+
+ ensure
+ ::Gitlab::Database::LoadBalancing.release_hosts
+ end
+ end
+ end
+
+ private
+
+ # This is a copy of https://github.com/mperham/sidekiq/blob/32c55e31659a1e6bd42f98334cca5eef2863de8d/lib/sidekiq/scheduled.rb#L11-L34
+ #
+ # It effectively reverts
+ # https://github.com/mperham/sidekiq/commit/9b75467b33759888753191413eddbc15c37a219e
+ # because we observe that the extra ZREMs caused by this change can lead to high
+ # CPU usage on Redis at peak times:
+ # https://gitlab.com/gitlab-com/gl-infra/scalability/-/issues/1179
+ #
+ def find_jobs_and_enqueue(now, sorted_sets)
# A job's "score" in Redis is the time at which it should be processed.
# Just check Redis for the set of jobs with a timestamp before now.
Sidekiq.redis do |conn|
@@ -24,8 +52,7 @@ module Gitlab
# We need to go through the list one at a time to reduce the risk of something
# going wrong between the time jobs are popped from the scheduled queue and when
# they are pushed onto a work queue and losing the jobs.
- while (job = conn.zrangebyscore(sorted_set, "-inf", now, limit: [0, 1]).first)
-
+ while job = conn.zrangebyscore(sorted_set, "-inf", now, limit: [0, 1]).first
# Pop item off the queue and add it to the work queue. If the job can't be popped from
# the queue, it's because another process already popped it so we can move on to the
# next one.
@@ -47,5 +74,38 @@ module Gitlab
end
end
end
+
+ def atomic_find_jobs_and_enqueue(now, sorted_sets)
+ Sidekiq.redis do |conn|
+ sorted_sets.each do |sorted_set|
+ start_time = ::Gitlab::Metrics::System.monotonic_time
+ jobs = 0
+
+ Sidekiq.logger.info(message: 'Atomically enqueuing scheduled jobs', status: 'start', sorted_set: sorted_set)
+
+ while job = redis_eval_lua(conn, LUA_ZPOPBYSCORE, LUA_ZPOPBYSCORE_SHA, keys: [sorted_set], argv: [now])
+ jobs += 1
+ Sidekiq::Client.push(Sidekiq.load_json(job))
+ end
+
+ end_time = ::Gitlab::Metrics::System.monotonic_time
+ Sidekiq.logger.info(message: 'Atomically enqueuing scheduled jobs',
+ status: 'done',
+ sorted_set: sorted_set,
+ jobs_count: jobs,
+ duration_s: end_time - start_time)
+ end
+ end
+ end
+
+ def redis_eval_lua(conn, script, sha, keys: nil, argv: nil)
+ conn.evalsha(sha, keys: keys, argv: argv)
+ rescue ::Redis::CommandError => e
+ if e.message.start_with?('NOSCRIPT')
+ conn.eval(script, keys: keys, argv: argv)
+ else
+ raise
+ end
+ end
end
end