summaryrefslogtreecommitdiff
path: root/lib/gitlab/sidekiq_enq.rb
blob: de0c00fe561ed708d76869110431b192271fee40 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
# frozen_string_literal: true

module Gitlab
  class SidekiqEnq
    LUA_ZPOPBYSCORE = <<~EOS
      local key, now = KEYS[1], ARGV[1]
      local jobs = redis.call("zrangebyscore", key, "-inf", now, "limit", 0, 1)
      if jobs[1] then
        redis.call("zrem", key, jobs[1])
        return jobs[1]
      end
    EOS

    LUA_ZPOPBYSCORE_SHA = Digest::SHA1.hexdigest(LUA_ZPOPBYSCORE)

    def enqueue_jobs(now = Time.now.to_f.to_s, sorted_sets = Sidekiq::Scheduled::SETS)
      Rails.application.reloader.wrap do
        ::Gitlab::WithRequestStore.with_request_store do
          if Feature.enabled?(:atomic_sidekiq_scheduler, default_enabled: :yaml)
            atomic_find_jobs_and_enqueue(now, sorted_sets)
          else
            find_jobs_and_enqueue(now, sorted_sets)
          end

        ensure
          ::Gitlab::Database::LoadBalancing.release_hosts
        end
      end
    end

    private

    # This is a copy of https://github.com/mperham/sidekiq/blob/32c55e31659a1e6bd42f98334cca5eef2863de8d/lib/sidekiq/scheduled.rb#L11-L34
    #
    # It effectively reverts
    # https://github.com/mperham/sidekiq/commit/9b75467b33759888753191413eddbc15c37a219e
    # because we observe that the extra ZREMs caused by this change can lead to high
    # CPU usage on Redis at peak times:
    # https://gitlab.com/gitlab-com/gl-infra/scalability/-/issues/1179
    #
    def find_jobs_and_enqueue(now, sorted_sets)
      # A job's "score" in Redis is the time at which it should be processed.
      # Just check Redis for the set of jobs with a timestamp before now.
      Sidekiq.redis do |conn|
        sorted_sets.each do |sorted_set|
          start_time = ::Gitlab::Metrics::System.monotonic_time
          jobs = redundant_jobs = 0

          Sidekiq.logger.info(message: 'Enqueuing scheduled jobs', status: 'start', sorted_set: sorted_set)

          # Get the next item in the queue if it's score (time to execute) is <= now.
          # We need to go through the list one at a time to reduce the risk of something
          # going wrong between the time jobs are popped from the scheduled queue and when
          # they are pushed onto a work queue and losing the jobs.
          while job = conn.zrangebyscore(sorted_set, "-inf", now, limit: [0, 1]).first
            # Pop item off the queue and add it to the work queue. If the job can't be popped from
            # the queue, it's because another process already popped it so we can move on to the
            # next one.
            if conn.zrem(sorted_set, job)
              jobs += 1
              Sidekiq::Client.push(Sidekiq.load_json(job))
            else
              redundant_jobs += 1
            end
          end

          end_time = ::Gitlab::Metrics::System.monotonic_time
          Sidekiq.logger.info(message: 'Enqueuing scheduled jobs',
                              status: 'done',
                              sorted_set: sorted_set,
                              jobs_count: jobs,
                              redundant_jobs_count: redundant_jobs,
                              duration_s: end_time - start_time)
        end
      end
    end

    def atomic_find_jobs_and_enqueue(now, sorted_sets)
      Sidekiq.redis do |conn|
        sorted_sets.each do |sorted_set|
          start_time = ::Gitlab::Metrics::System.monotonic_time
          jobs = 0

          Sidekiq.logger.info(message: 'Atomically enqueuing scheduled jobs', status: 'start', sorted_set: sorted_set)

          while job = redis_eval_lua(conn, LUA_ZPOPBYSCORE, LUA_ZPOPBYSCORE_SHA, keys: [sorted_set], argv: [now])
            jobs += 1
            Sidekiq::Client.push(Sidekiq.load_json(job))
          end

          end_time = ::Gitlab::Metrics::System.monotonic_time
          Sidekiq.logger.info(message: 'Atomically enqueuing scheduled jobs',
                              status: 'done',
                              sorted_set: sorted_set,
                              jobs_count: jobs,
                              duration_s: end_time - start_time)
        end
      end
    end

    def redis_eval_lua(conn, script, sha, keys: nil, argv: nil)
      conn.evalsha(sha, keys: keys, argv: argv)
    rescue ::Redis::CommandError => e
      if e.message.start_with?('NOSCRIPT')
        conn.eval(script, keys: keys, argv: argv)
      else
        raise
      end
    end
  end
end