diff options
Diffstat (limited to 'lib/gitlab/sidekiq_enq.rb')
-rw-r--r-- | lib/gitlab/sidekiq_enq.rb | 80 |
1 files changed, 70 insertions, 10 deletions
diff --git a/lib/gitlab/sidekiq_enq.rb b/lib/gitlab/sidekiq_enq.rb index d8a01ac8ef4..de0c00fe561 100644 --- a/lib/gitlab/sidekiq_enq.rb +++ b/lib/gitlab/sidekiq_enq.rb @@ -1,16 +1,44 @@ # frozen_string_literal: true -# This is a copy of https://github.com/mperham/sidekiq/blob/32c55e31659a1e6bd42f98334cca5eef2863de8d/lib/sidekiq/scheduled.rb#L11-L34 -# -# It effectively reverts -# https://github.com/mperham/sidekiq/commit/9b75467b33759888753191413eddbc15c37a219e -# because we observe that the extra ZREMs caused by this change can lead to high -# CPU usage on Redis at peak times: -# https://gitlab.com/gitlab-com/gl-infra/scalability/-/issues/1179 -# module Gitlab class SidekiqEnq + LUA_ZPOPBYSCORE = <<~EOS + local key, now = KEYS[1], ARGV[1] + local jobs = redis.call("zrangebyscore", key, "-inf", now, "limit", 0, 1) + if jobs[1] then + redis.call("zrem", key, jobs[1]) + return jobs[1] + end + EOS + + LUA_ZPOPBYSCORE_SHA = Digest::SHA1.hexdigest(LUA_ZPOPBYSCORE) + def enqueue_jobs(now = Time.now.to_f.to_s, sorted_sets = Sidekiq::Scheduled::SETS) + Rails.application.reloader.wrap do + ::Gitlab::WithRequestStore.with_request_store do + if Feature.enabled?(:atomic_sidekiq_scheduler, default_enabled: :yaml) + atomic_find_jobs_and_enqueue(now, sorted_sets) + else + find_jobs_and_enqueue(now, sorted_sets) + end + + ensure + ::Gitlab::Database::LoadBalancing.release_hosts + end + end + end + + private + + # This is a copy of https://github.com/mperham/sidekiq/blob/32c55e31659a1e6bd42f98334cca5eef2863de8d/lib/sidekiq/scheduled.rb#L11-L34 + # + # It effectively reverts + # https://github.com/mperham/sidekiq/commit/9b75467b33759888753191413eddbc15c37a219e + # because we observe that the extra ZREMs caused by this change can lead to high + # CPU usage on Redis at peak times: + # https://gitlab.com/gitlab-com/gl-infra/scalability/-/issues/1179 + # + def find_jobs_and_enqueue(now, sorted_sets) # A job's "score" in Redis is the time at which it should be processed. # Just check Redis for the set of jobs with a timestamp before now. Sidekiq.redis do |conn| @@ -24,8 +52,7 @@ module Gitlab # We need to go through the list one at a time to reduce the risk of something # going wrong between the time jobs are popped from the scheduled queue and when # they are pushed onto a work queue and losing the jobs. - while (job = conn.zrangebyscore(sorted_set, "-inf", now, limit: [0, 1]).first) - + while job = conn.zrangebyscore(sorted_set, "-inf", now, limit: [0, 1]).first # Pop item off the queue and add it to the work queue. If the job can't be popped from # the queue, it's because another process already popped it so we can move on to the # next one. @@ -47,5 +74,38 @@ module Gitlab end end end + + def atomic_find_jobs_and_enqueue(now, sorted_sets) + Sidekiq.redis do |conn| + sorted_sets.each do |sorted_set| + start_time = ::Gitlab::Metrics::System.monotonic_time + jobs = 0 + + Sidekiq.logger.info(message: 'Atomically enqueuing scheduled jobs', status: 'start', sorted_set: sorted_set) + + while job = redis_eval_lua(conn, LUA_ZPOPBYSCORE, LUA_ZPOPBYSCORE_SHA, keys: [sorted_set], argv: [now]) + jobs += 1 + Sidekiq::Client.push(Sidekiq.load_json(job)) + end + + end_time = ::Gitlab::Metrics::System.monotonic_time + Sidekiq.logger.info(message: 'Atomically enqueuing scheduled jobs', + status: 'done', + sorted_set: sorted_set, + jobs_count: jobs, + duration_s: end_time - start_time) + end + end + end + + def redis_eval_lua(conn, script, sha, keys: nil, argv: nil) + conn.evalsha(sha, keys: keys, argv: argv) + rescue ::Redis::CommandError => e + if e.message.start_with?('NOSCRIPT') + conn.eval(script, keys: keys, argv: argv) + else + raise + end + end end end |