summaryrefslogtreecommitdiff
path: root/app/workers/concerns/application_worker.rb
diff options
context:
space:
mode:
Diffstat (limited to 'app/workers/concerns/application_worker.rb')
-rw-r--r--app/workers/concerns/application_worker.rb95
1 files changed, 87 insertions, 8 deletions
diff --git a/app/workers/concerns/application_worker.rb b/app/workers/concerns/application_worker.rb
index 3399a4f9b57..03a0b5fae00 100644
--- a/app/workers/concerns/application_worker.rb
+++ b/app/workers/concerns/application_worker.rb
@@ -14,6 +14,7 @@ module ApplicationWorker
LOGGING_EXTRA_KEY = 'extra'
DEFAULT_DELAY_INTERVAL = 1
+ SAFE_PUSH_BULK_LIMIT = 1000
included do
set_queue
@@ -54,6 +55,12 @@ module ApplicationWorker
subclass.after_set_class_attribute { subclass.set_queue }
end
+ def with_status
+ status_from_class = self.sidekiq_options_hash['status_expiration']
+
+ set(status_expiration: status_from_class || Gitlab::SidekiqStatus::DEFAULT_EXPIRATION)
+ end
+
def generated_queue_name
Gitlab::SidekiqConfig::WorkerRouter.queue_name_from_worker_name(self)
end
@@ -130,29 +137,62 @@ module ApplicationWorker
end
end
+ def log_bulk_perform_async?
+ @log_bulk_perform_async
+ end
+
+ def log_bulk_perform_async!
+ @log_bulk_perform_async = true
+ end
+
def queue_size
Sidekiq::Queue.new(queue).size
end
def bulk_perform_async(args_list)
- Sidekiq::Client.push_bulk('class' => self, 'args' => args_list)
+ if log_bulk_perform_async?
+ Sidekiq.logger.info('class' => self.name, 'args_list' => args_list, 'args_list_count' => args_list.length, 'message' => 'Inserting multiple jobs')
+ end
+
+ do_push_bulk(args_list).tap do |job_ids|
+ if log_bulk_perform_async?
+ Sidekiq.logger.info('class' => self.name, 'jid_list' => job_ids, 'jid_list_count' => job_ids.length, 'message' => 'Completed JID insertion')
+ end
+ end
end
def bulk_perform_in(delay, args_list, batch_size: nil, batch_delay: nil)
now = Time.now.to_i
- schedule = now + delay.to_i
+ base_schedule_at = now + delay.to_i
- if schedule <= now
- raise ArgumentError, _('The schedule time must be in the future!')
+ if base_schedule_at <= now
+ raise ArgumentError, 'The schedule time must be in the future!'
end
+ schedule_at = base_schedule_at
+
if batch_size && batch_delay
- args_list.each_slice(batch_size.to_i).with_index do |args_batch, idx|
- batch_schedule = schedule + idx * batch_delay.to_i
- Sidekiq::Client.push_bulk('class' => self, 'args' => args_batch, 'at' => batch_schedule)
+ batch_size = batch_size.to_i
+ batch_delay = batch_delay.to_i
+
+ raise ArgumentError, 'batch_size should be greater than 0' unless batch_size > 0
+ raise ArgumentError, 'batch_delay should be greater than 0' unless batch_delay > 0
+
+ # build an array of schedules corresponding to each item in `args_list`
+ bulk_schedule_at = Array.new(args_list.size) do |index|
+ batch_number = index / batch_size
+ base_schedule_at + (batch_number * batch_delay)
+ end
+
+ schedule_at = bulk_schedule_at
+ end
+
+ if Feature.enabled?(:sidekiq_push_bulk_in_batches)
+ in_safe_limit_batches(args_list, schedule_at) do |args_batch, schedule_at_for_batch|
+ Sidekiq::Client.push_bulk('class' => self, 'args' => args_batch, 'at' => schedule_at_for_batch)
end
else
- Sidekiq::Client.push_bulk('class' => self, 'args' => args_list, 'at' => schedule)
+ Sidekiq::Client.push_bulk('class' => self, 'args' => args_list, 'at' => schedule_at)
end
end
@@ -161,5 +201,44 @@ module ApplicationWorker
def delay_interval
DEFAULT_DELAY_INTERVAL.seconds
end
+
+ private
+
+ def do_push_bulk(args_list)
+ if Feature.enabled?(:sidekiq_push_bulk_in_batches)
+ in_safe_limit_batches(args_list) do |args_batch, _|
+ Sidekiq::Client.push_bulk('class' => self, 'args' => args_batch)
+ end
+ else
+ Sidekiq::Client.push_bulk('class' => self, 'args' => args_list)
+ end
+ end
+
+ def in_safe_limit_batches(args_list, schedule_at = nil, safe_limit = SAFE_PUSH_BULK_LIMIT)
+ # `schedule_at` could be one of
+ # - nil.
+ # - a single Numeric that represents time, like `30.minutes.from_now.to_i`.
+ # - an array, where each element is a Numeric that reprsents time.
+ # - Each element in this array would correspond to the time at which
+ # - the job in `args_list` at the corresponding index needs to be scheduled.
+
+ # In the case where `schedule_at` is an array of Numeric, it needs to be sliced
+ # in the same manner as the `args_list`, with each slice containing `safe_limit`
+ # number of elements.
+ schedule_at = schedule_at.each_slice(safe_limit).to_a if schedule_at.is_a?(Array)
+
+ args_list.each_slice(safe_limit).with_index.flat_map do |args_batch, index|
+ schedule_at_for_batch = process_schedule_at_for_batch(schedule_at, index)
+
+ yield(args_batch, schedule_at_for_batch)
+ end
+ end
+
+ def process_schedule_at_for_batch(schedule_at, index)
+ return unless schedule_at
+ return schedule_at[index] if schedule_at.is_a?(Array) && schedule_at.all?(Array)
+
+ schedule_at
+ end
end
end