summaryrefslogtreecommitdiff
path: root/lib/gitlab/database/migrations/sidekiq_helpers.rb
diff options
context:
space:
mode:
Diffstat (limited to 'lib/gitlab/database/migrations/sidekiq_helpers.rb')
-rw-r--r--lib/gitlab/database/migrations/sidekiq_helpers.rb112
1 files changed, 112 insertions, 0 deletions
diff --git a/lib/gitlab/database/migrations/sidekiq_helpers.rb b/lib/gitlab/database/migrations/sidekiq_helpers.rb
new file mode 100644
index 00000000000..c536b33bbdf
--- /dev/null
+++ b/lib/gitlab/database/migrations/sidekiq_helpers.rb
@@ -0,0 +1,112 @@
+# frozen_string_literal: true
+
+module Gitlab
+ module Database
+ module Migrations
+ # rubocop:disable Cop/SidekiqApiUsage
+ # rubocop:disable Cop/SidekiqRedisCall
+ module SidekiqHelpers
+ # Constants for default sidekiq_remove_jobs values
+ DEFAULT_MAX_ATTEMPTS = 5
+ DEFAULT_TIMES_IN_A_ROW = 2
+
+ # Probabilistically removes job_klasses from their specific queues, the
+ # retry set and the scheduled set.
+ #
+ # If jobs are still being processed at the same time, then there is a
+ # small chance it will not remove all instances of job_klass. To
+ # minimize this risk, it repeatedly removes matching jobs from each
+ # until nothing is removed twice in a row.
+ #
+ # Before calling this method, you should make sure that job_klass is no
+ # longer being scheduled within the running application.
+ def sidekiq_remove_jobs(
+ job_klasses:,
+ times_in_a_row: DEFAULT_TIMES_IN_A_ROW,
+ max_attempts: DEFAULT_MAX_ATTEMPTS
+ )
+
+ kwargs = { times_in_a_row: times_in_a_row, max_attempts: max_attempts }
+
+ job_klasses_queues = job_klasses
+ .select { |job_klass| job_klass.to_s.safe_constantize.present? }
+ .map { |job_klass| job_klass.safe_constantize.queue }
+ .uniq
+
+ job_klasses_queues.each do |queue|
+ delete_jobs_for(
+ set: Sidekiq::Queue.new(queue),
+ job_klasses: job_klasses,
+ kwargs: kwargs
+ )
+ end
+
+ delete_jobs_for(
+ set: Sidekiq::RetrySet.new,
+ kwargs: kwargs,
+ job_klasses: job_klasses
+ )
+
+ delete_jobs_for(
+ set: Sidekiq::ScheduledSet.new,
+ kwargs: kwargs,
+ job_klasses: job_klasses
+ )
+ end
+
+ def sidekiq_queue_migrate(queue_from, to:)
+ while sidekiq_queue_length(queue_from) > 0
+ Sidekiq.redis do |conn|
+ conn.rpoplpush "queue:#{queue_from}", "queue:#{to}"
+ end
+ end
+ end
+
+ def sidekiq_queue_length(queue_name)
+ Sidekiq.redis do |conn|
+ conn.llen("queue:#{queue_name}")
+ end
+ end
+
+ private
+
+ # Handle the "jobs deleted" tracking that is needed in order to track
+ # whether a job was deleted or not.
+ def delete_jobs_for(set:, kwargs:, job_klasses:)
+ until_equal_to(0, **kwargs) do
+ set.count do |job|
+ job_klasses.include?(job.klass) && job.delete
+ end
+ end
+ end
+
+ # Control how many times in a row you want to see a job deleted 0
+ # times. The idea is that if you see 0 jobs deleted x number of times
+ # in a row you've *likely* covered the case in which the queue was
+ # mutating while this was running.
+ def until_equal_to(target, times_in_a_row:, max_attempts:)
+ streak = 0
+
+ result = { attempts: 0, success: false }
+
+ 1.upto(max_attempts) do |current_attempt|
+ # yield's return value is a count of "jobs_deleted"
+ if yield == target
+ streak += 1
+ elsif streak > 0
+ streak = 0
+ end
+
+ result[:attempts] = current_attempt
+ result[:success] = streak == times_in_a_row
+
+ break if result[:success]
+ end
+ result
+ end
+ end
+ # rubocop:enable Cop/SidekiqApiUsage
+ # rubocop:enable Cop/SidekiqRedisCall
+ end
+ end
+end