diff options
Diffstat (limited to 'lib/gitlab/sidekiq_migrate_jobs.rb')
-rw-r--r-- | lib/gitlab/sidekiq_migrate_jobs.rb | 66 |
1 files changed, 57 insertions, 9 deletions
diff --git a/lib/gitlab/sidekiq_migrate_jobs.rb b/lib/gitlab/sidekiq_migrate_jobs.rb index 62d62bf82c4..2467dd7ca43 100644 --- a/lib/gitlab/sidekiq_migrate_jobs.rb +++ b/lib/gitlab/sidekiq_migrate_jobs.rb @@ -3,16 +3,18 @@ module Gitlab class SidekiqMigrateJobs LOG_FREQUENCY = 1_000 + LOG_FREQUENCY_QUEUES = 10 - attr_reader :sidekiq_set, :logger + attr_reader :logger, :mappings - def initialize(sidekiq_set, logger: nil) - @sidekiq_set = sidekiq_set + # mappings is a hash of WorkerClassName => target_queue_name + def initialize(mappings, logger: nil) + @mappings = mappings @logger = logger end - # mappings is a hash of WorkerClassName => target_queue_name - def execute(mappings) + # Migrate jobs in SortedSets, i.e. scheduled and retry sets. + def migrate_set(sidekiq_set) source_queues_regex = Regexp.union(mappings.keys) cursor = 0 scanned = 0 @@ -33,7 +35,7 @@ module Gitlab next unless job.match?(source_queues_regex) - job_hash = Sidekiq.load_json(job) + job_hash = Gitlab::Json.load(job) destination_queue = mappings[job_hash['class']] next unless mappings.has_key?(job_hash['class']) @@ -41,7 +43,7 @@ module Gitlab job_hash['queue'] = destination_queue - migrated += migrate_job(job, score, job_hash) + migrated += migrate_job_in_set(sidekiq_set, job, score, job_hash) end end while cursor.to_i != 0 @@ -53,14 +55,54 @@ module Gitlab } end + # Migrates jobs from queues that are outside the mappings + def migrate_queues + routing_rules_queues = mappings.values.uniq + logger&.info("List of queues based on routing rules: #{routing_rules_queues}") + Sidekiq.redis do |conn| + # Redis 6 supports conn.scan_each(match: "queue:*", type: 'list') + conn.scan_each(match: "queue:*") do |key| + # Redis 5 compatibility + next unless conn.type(key) == 'list' + + queue_from = key.split(':', 2).last + next if routing_rules_queues.include?(queue_from) + + logger&.info("Migrating #{queue_from} queue") + + migrated = 0 + while queue_length(queue_from) > 0 + begin + if migrated >= 0 && migrated % LOG_FREQUENCY_QUEUES == 0 + logger&.info("Migrating from #{queue_from}. Total: #{queue_length(queue_from)}. Migrated: #{migrated}.") + end + + job = conn.rpop "queue:#{queue_from}" + job_hash = Gitlab::Json.load(job) + next unless mappings.has_key?(job_hash['class']) + + destination_queue = mappings[job_hash['class']] + job_hash['queue'] = destination_queue + conn.lpush("queue:#{destination_queue}", Gitlab::Json.dump(job_hash)) + migrated += 1 + rescue JSON::ParserError + logger&.error("Unmarshal JSON payload from SidekiqMigrateJobs failed. Job: #{job}") + next + end + end + logger&.info("Finished migrating #{queue_from} queue") + end + end + end + private - def migrate_job(job, score, job_hash) + def migrate_job_in_set(sidekiq_set, job, score, job_hash) Sidekiq.redis do |connection| removed = connection.zrem(sidekiq_set, job) if removed - connection.zadd(sidekiq_set, score, Sidekiq.dump_json(job_hash)) + connection.zadd(sidekiq_set, score, Gitlab::Json.dump(job_hash)) 1 else @@ -68,5 +110,11 @@ module Gitlab end end end + + def queue_length(queue_name) + Sidekiq.redis do |conn| + conn.llen("queue:#{queue_name}") + end + end end end |