summaryrefslogtreecommitdiff
path: root/lib/gitlab/sidekiq_migrate_jobs.rb
blob: 62d62bf82c4e04012c18d7003df8f2520d229317 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
# frozen_string_literal: true

module Gitlab
  class SidekiqMigrateJobs
    LOG_FREQUENCY = 1_000

    attr_reader :sidekiq_set, :logger

    def initialize(sidekiq_set, logger: nil)
      @sidekiq_set = sidekiq_set
      @logger = logger
    end

    # mappings is a hash of WorkerClassName => target_queue_name
    def execute(mappings)
      source_queues_regex = Regexp.union(mappings.keys)
      cursor = 0
      scanned = 0
      migrated = 0

      estimated_size = Sidekiq.redis { |c| c.zcard(sidekiq_set) }
      logger&.info("Processing #{sidekiq_set} set. Estimated size: #{estimated_size}.")

      begin
        cursor, jobs = Sidekiq.redis { |c| c.zscan(sidekiq_set, cursor) }

        jobs.each do |(job, score)|
          if scanned > 0 && scanned % LOG_FREQUENCY == 0
            logger&.info("In progress. Scanned records: #{scanned}. Migrated records: #{migrated}.")
          end

          scanned += 1

          next unless job.match?(source_queues_regex)

          job_hash = Sidekiq.load_json(job)
          destination_queue = mappings[job_hash['class']]

          next unless mappings.has_key?(job_hash['class'])
          next if job_hash['queue'] == destination_queue

          job_hash['queue'] = destination_queue

          migrated += migrate_job(job, score, job_hash)
        end
      end while cursor.to_i != 0

      logger&.info("Done. Scanned records: #{scanned}. Migrated records: #{migrated}.")

      {
        scanned: scanned,
        migrated: migrated
      }
    end

    private

    def migrate_job(job, score, job_hash)
      Sidekiq.redis do |connection|
        removed = connection.zrem(sidekiq_set, job)

        if removed
          connection.zadd(sidekiq_set, score, Sidekiq.dump_json(job_hash))

          1
        else
          0
        end
      end
    end
  end
end