diff options
author | GitLab Bot <gitlab-bot@gitlab.com> | 2021-11-18 13:16:36 +0000 |
---|---|---|
committer | GitLab Bot <gitlab-bot@gitlab.com> | 2021-11-18 13:16:36 +0000 |
commit | 311b0269b4eb9839fa63f80c8d7a58f32b8138a0 (patch) | |
tree | 07e7870bca8aed6d61fdcc810731c50d2c40af47 /app/workers/concerns/application_worker.rb | |
parent | 27909cef6c4170ed9205afa7426b8d3de47cbb0c (diff) | |
download | gitlab-ce-311b0269b4eb9839fa63f80c8d7a58f32b8138a0.tar.gz |
Add latest changes from gitlab-org/gitlab@14-5-stable-eev14.5.0-rc42
Diffstat (limited to 'app/workers/concerns/application_worker.rb')
-rw-r--r-- | app/workers/concerns/application_worker.rb | 95 |
1 files changed, 87 insertions, 8 deletions
diff --git a/app/workers/concerns/application_worker.rb b/app/workers/concerns/application_worker.rb index 3399a4f9b57..03a0b5fae00 100644 --- a/app/workers/concerns/application_worker.rb +++ b/app/workers/concerns/application_worker.rb @@ -14,6 +14,7 @@ module ApplicationWorker LOGGING_EXTRA_KEY = 'extra' DEFAULT_DELAY_INTERVAL = 1 + SAFE_PUSH_BULK_LIMIT = 1000 included do set_queue @@ -54,6 +55,12 @@ module ApplicationWorker subclass.after_set_class_attribute { subclass.set_queue } end + def with_status + status_from_class = self.sidekiq_options_hash['status_expiration'] + + set(status_expiration: status_from_class || Gitlab::SidekiqStatus::DEFAULT_EXPIRATION) + end + def generated_queue_name Gitlab::SidekiqConfig::WorkerRouter.queue_name_from_worker_name(self) end @@ -130,29 +137,62 @@ module ApplicationWorker end end + def log_bulk_perform_async? + @log_bulk_perform_async + end + + def log_bulk_perform_async! + @log_bulk_perform_async = true + end + def queue_size Sidekiq::Queue.new(queue).size end def bulk_perform_async(args_list) - Sidekiq::Client.push_bulk('class' => self, 'args' => args_list) + if log_bulk_perform_async? + Sidekiq.logger.info('class' => self.name, 'args_list' => args_list, 'args_list_count' => args_list.length, 'message' => 'Inserting multiple jobs') + end + + do_push_bulk(args_list).tap do |job_ids| + if log_bulk_perform_async? + Sidekiq.logger.info('class' => self.name, 'jid_list' => job_ids, 'jid_list_count' => job_ids.length, 'message' => 'Completed JID insertion') + end + end end def bulk_perform_in(delay, args_list, batch_size: nil, batch_delay: nil) now = Time.now.to_i - schedule = now + delay.to_i + base_schedule_at = now + delay.to_i - if schedule <= now - raise ArgumentError, _('The schedule time must be in the future!') + if base_schedule_at <= now + raise ArgumentError, 'The schedule time must be in the future!' end + schedule_at = base_schedule_at + if batch_size && batch_delay - args_list.each_slice(batch_size.to_i).with_index do |args_batch, idx| - batch_schedule = schedule + idx * batch_delay.to_i - Sidekiq::Client.push_bulk('class' => self, 'args' => args_batch, 'at' => batch_schedule) + batch_size = batch_size.to_i + batch_delay = batch_delay.to_i + + raise ArgumentError, 'batch_size should be greater than 0' unless batch_size > 0 + raise ArgumentError, 'batch_delay should be greater than 0' unless batch_delay > 0 + + # build an array of schedules corresponding to each item in `args_list` + bulk_schedule_at = Array.new(args_list.size) do |index| + batch_number = index / batch_size + base_schedule_at + (batch_number * batch_delay) + end + + schedule_at = bulk_schedule_at + end + + if Feature.enabled?(:sidekiq_push_bulk_in_batches) + in_safe_limit_batches(args_list, schedule_at) do |args_batch, schedule_at_for_batch| + Sidekiq::Client.push_bulk('class' => self, 'args' => args_batch, 'at' => schedule_at_for_batch) end else - Sidekiq::Client.push_bulk('class' => self, 'args' => args_list, 'at' => schedule) + Sidekiq::Client.push_bulk('class' => self, 'args' => args_list, 'at' => schedule_at) end end @@ -161,5 +201,44 @@ module ApplicationWorker def delay_interval DEFAULT_DELAY_INTERVAL.seconds end + + private + + def do_push_bulk(args_list) + if Feature.enabled?(:sidekiq_push_bulk_in_batches) + in_safe_limit_batches(args_list) do |args_batch, _| + Sidekiq::Client.push_bulk('class' => self, 'args' => args_batch) + end + else + Sidekiq::Client.push_bulk('class' => self, 'args' => args_list) + end + end + + def in_safe_limit_batches(args_list, schedule_at = nil, safe_limit = SAFE_PUSH_BULK_LIMIT) + # `schedule_at` could be one of + # - nil. + # - a single Numeric that represents time, like `30.minutes.from_now.to_i`. + # - an array, where each element is a Numeric that reprsents time. + # - Each element in this array would correspond to the time at which + # - the job in `args_list` at the corresponding index needs to be scheduled. + + # In the case where `schedule_at` is an array of Numeric, it needs to be sliced + # in the same manner as the `args_list`, with each slice containing `safe_limit` + # number of elements. + schedule_at = schedule_at.each_slice(safe_limit).to_a if schedule_at.is_a?(Array) + + args_list.each_slice(safe_limit).with_index.flat_map do |args_batch, index| + schedule_at_for_batch = process_schedule_at_for_batch(schedule_at, index) + + yield(args_batch, schedule_at_for_batch) + end + end + + def process_schedule_at_for_batch(schedule_at, index) + return unless schedule_at + return schedule_at[index] if schedule_at.is_a?(Array) && schedule_at.all?(Array) + + schedule_at + end end end |