summaryrefslogtreecommitdiff
path: root/app/workers/concerns
diff options
context:
space:
mode:
Diffstat (limited to 'app/workers/concerns')
-rw-r--r--app/workers/concerns/application_worker.rb95
-rw-r--r--app/workers/concerns/gitlab/github_import/rescheduling_methods.rb4
-rw-r--r--app/workers/concerns/gitlab/github_import/stage_methods.rb6
-rw-r--r--app/workers/concerns/gitlab/jira_import/import_worker.rb2
-rw-r--r--app/workers/concerns/limited_capacity/worker.rb2
-rw-r--r--app/workers/concerns/new_issuable.rb8
-rw-r--r--app/workers/concerns/todos_destroyer_queue.rb2
7 files changed, 96 insertions, 23 deletions
diff --git a/app/workers/concerns/application_worker.rb b/app/workers/concerns/application_worker.rb
index 3399a4f9b57..03a0b5fae00 100644
--- a/app/workers/concerns/application_worker.rb
+++ b/app/workers/concerns/application_worker.rb
@@ -14,6 +14,7 @@ module ApplicationWorker
LOGGING_EXTRA_KEY = 'extra'
DEFAULT_DELAY_INTERVAL = 1
+ SAFE_PUSH_BULK_LIMIT = 1000
included do
set_queue
@@ -54,6 +55,12 @@ module ApplicationWorker
subclass.after_set_class_attribute { subclass.set_queue }
end
+ def with_status
+ status_from_class = self.sidekiq_options_hash['status_expiration']
+
+ set(status_expiration: status_from_class || Gitlab::SidekiqStatus::DEFAULT_EXPIRATION)
+ end
+
def generated_queue_name
Gitlab::SidekiqConfig::WorkerRouter.queue_name_from_worker_name(self)
end
@@ -130,29 +137,62 @@ module ApplicationWorker
end
end
+ def log_bulk_perform_async?
+ @log_bulk_perform_async
+ end
+
+ def log_bulk_perform_async!
+ @log_bulk_perform_async = true
+ end
+
def queue_size
Sidekiq::Queue.new(queue).size
end
def bulk_perform_async(args_list)
- Sidekiq::Client.push_bulk('class' => self, 'args' => args_list)
+ if log_bulk_perform_async?
+ Sidekiq.logger.info('class' => self.name, 'args_list' => args_list, 'args_list_count' => args_list.length, 'message' => 'Inserting multiple jobs')
+ end
+
+ do_push_bulk(args_list).tap do |job_ids|
+ if log_bulk_perform_async?
+ Sidekiq.logger.info('class' => self.name, 'jid_list' => job_ids, 'jid_list_count' => job_ids.length, 'message' => 'Completed JID insertion')
+ end
+ end
end
def bulk_perform_in(delay, args_list, batch_size: nil, batch_delay: nil)
now = Time.now.to_i
- schedule = now + delay.to_i
+ base_schedule_at = now + delay.to_i
- if schedule <= now
- raise ArgumentError, _('The schedule time must be in the future!')
+ if base_schedule_at <= now
+ raise ArgumentError, 'The schedule time must be in the future!'
end
+ schedule_at = base_schedule_at
+
if batch_size && batch_delay
- args_list.each_slice(batch_size.to_i).with_index do |args_batch, idx|
- batch_schedule = schedule + idx * batch_delay.to_i
- Sidekiq::Client.push_bulk('class' => self, 'args' => args_batch, 'at' => batch_schedule)
+ batch_size = batch_size.to_i
+ batch_delay = batch_delay.to_i
+
+ raise ArgumentError, 'batch_size should be greater than 0' unless batch_size > 0
+ raise ArgumentError, 'batch_delay should be greater than 0' unless batch_delay > 0
+
+ # build an array of schedules corresponding to each item in `args_list`
+ bulk_schedule_at = Array.new(args_list.size) do |index|
+ batch_number = index / batch_size
+ base_schedule_at + (batch_number * batch_delay)
+ end
+
+ schedule_at = bulk_schedule_at
+ end
+
+ if Feature.enabled?(:sidekiq_push_bulk_in_batches)
+ in_safe_limit_batches(args_list, schedule_at) do |args_batch, schedule_at_for_batch|
+ Sidekiq::Client.push_bulk('class' => self, 'args' => args_batch, 'at' => schedule_at_for_batch)
end
else
- Sidekiq::Client.push_bulk('class' => self, 'args' => args_list, 'at' => schedule)
+ Sidekiq::Client.push_bulk('class' => self, 'args' => args_list, 'at' => schedule_at)
end
end
@@ -161,5 +201,44 @@ module ApplicationWorker
def delay_interval
DEFAULT_DELAY_INTERVAL.seconds
end
+
+ private
+
+ def do_push_bulk(args_list)
+ if Feature.enabled?(:sidekiq_push_bulk_in_batches)
+ in_safe_limit_batches(args_list) do |args_batch, _|
+ Sidekiq::Client.push_bulk('class' => self, 'args' => args_batch)
+ end
+ else
+ Sidekiq::Client.push_bulk('class' => self, 'args' => args_list)
+ end
+ end
+
+ def in_safe_limit_batches(args_list, schedule_at = nil, safe_limit = SAFE_PUSH_BULK_LIMIT)
+ # `schedule_at` could be one of
+ # - nil.
+ # - a single Numeric that represents time, like `30.minutes.from_now.to_i`.
+ # - an array, where each element is a Numeric that reprsents time.
+ # - Each element in this array would correspond to the time at which
+ # - the job in `args_list` at the corresponding index needs to be scheduled.
+
+ # In the case where `schedule_at` is an array of Numeric, it needs to be sliced
+ # in the same manner as the `args_list`, with each slice containing `safe_limit`
+ # number of elements.
+ schedule_at = schedule_at.each_slice(safe_limit).to_a if schedule_at.is_a?(Array)
+
+ args_list.each_slice(safe_limit).with_index.flat_map do |args_batch, index|
+ schedule_at_for_batch = process_schedule_at_for_batch(schedule_at, index)
+
+ yield(args_batch, schedule_at_for_batch)
+ end
+ end
+
+ def process_schedule_at_for_batch(schedule_at, index)
+ return unless schedule_at
+ return schedule_at[index] if schedule_at.is_a?(Array) && schedule_at.all?(Array)
+
+ schedule_at
+ end
end
end
diff --git a/app/workers/concerns/gitlab/github_import/rescheduling_methods.rb b/app/workers/concerns/gitlab/github_import/rescheduling_methods.rb
index eb1af0869bd..0a43a0fc4d2 100644
--- a/app/workers/concerns/gitlab/github_import/rescheduling_methods.rb
+++ b/app/workers/concerns/gitlab/github_import/rescheduling_methods.rb
@@ -8,9 +8,8 @@ module Gitlab
# project_id - The ID of the GitLab project to import the note into.
# hash - A Hash containing the details of the GitHub object to import.
# notify_key - The Redis key to notify upon completion, if any.
- # rubocop: disable CodeReuse/ActiveRecord
def perform(project_id, hash, notify_key = nil)
- project = Project.find_by(id: project_id)
+ project = Project.find_by_id(project_id)
return notify_waiter(notify_key) unless project
@@ -25,7 +24,6 @@ module Gitlab
.perform_in(client.rate_limit_resets_in, project.id, hash, notify_key)
end
end
- # rubocop: enable CodeReuse/ActiveRecord
def try_import(*args)
import(*args)
diff --git a/app/workers/concerns/gitlab/github_import/stage_methods.rb b/app/workers/concerns/gitlab/github_import/stage_methods.rb
index d7b4578af63..225716f6bf3 100644
--- a/app/workers/concerns/gitlab/github_import/stage_methods.rb
+++ b/app/workers/concerns/gitlab/github_import/stage_methods.rb
@@ -33,13 +33,13 @@ module Gitlab
self.class.perform_in(client.rate_limit_resets_in, project.id)
end
- # rubocop: disable CodeReuse/ActiveRecord
def find_project(id)
# If the project has been marked as failed we want to bail out
# automatically.
- Project.joins_import_state.where(import_state: { status: :started }).find_by(id: id)
+ # rubocop: disable CodeReuse/ActiveRecord
+ Project.joins_import_state.where(import_state: { status: :started }).find_by_id(id)
+ # rubocop: enable CodeReuse/ActiveRecord
end
- # rubocop: enable CodeReuse/ActiveRecord
def abort_on_failure
false
diff --git a/app/workers/concerns/gitlab/jira_import/import_worker.rb b/app/workers/concerns/gitlab/jira_import/import_worker.rb
index 107b6e2e9be..d18b9ff023b 100644
--- a/app/workers/concerns/gitlab/jira_import/import_worker.rb
+++ b/app/workers/concerns/gitlab/jira_import/import_worker.rb
@@ -14,7 +14,7 @@ module Gitlab
end
def perform(project_id)
- project = Project.find_by(id: project_id) # rubocop: disable CodeReuse/ActiveRecord
+ project = Project.find_by_id(project_id)
return unless can_import?(project)
diff --git a/app/workers/concerns/limited_capacity/worker.rb b/app/workers/concerns/limited_capacity/worker.rb
index b4cdfda680f..bcedb4efcc0 100644
--- a/app/workers/concerns/limited_capacity/worker.rb
+++ b/app/workers/concerns/limited_capacity/worker.rb
@@ -47,7 +47,7 @@ module LimitedCapacity
# would be occupied by a job that will be performed in the distant future.
# We let the cron worker enqueue new jobs, this could be seen as our retry and
# back off mechanism because the job might fail again if executed immediately.
- sidekiq_options retry: 0
+ sidekiq_options retry: 0, status_expiration: Gitlab::SidekiqStatus::DEFAULT_EXPIRATION
deduplicate :none
end
diff --git a/app/workers/concerns/new_issuable.rb b/app/workers/concerns/new_issuable.rb
index 482a74f49f7..d761f023cad 100644
--- a/app/workers/concerns/new_issuable.rb
+++ b/app/workers/concerns/new_issuable.rb
@@ -10,21 +10,17 @@ module NewIssuable
user && issuable
end
- # rubocop: disable CodeReuse/ActiveRecord
def set_user(user_id)
- @user = User.find_by(id: user_id) # rubocop:disable Gitlab/ModuleWithInstanceVariables
+ @user = User.find_by_id(user_id) # rubocop:disable Gitlab/ModuleWithInstanceVariables
log_error(User, user_id) unless @user # rubocop:disable Gitlab/ModuleWithInstanceVariables
end
- # rubocop: enable CodeReuse/ActiveRecord
- # rubocop: disable CodeReuse/ActiveRecord
def set_issuable(issuable_id)
- @issuable = issuable_class.find_by(id: issuable_id) # rubocop:disable Gitlab/ModuleWithInstanceVariables
+ @issuable = issuable_class.find_by_id(issuable_id) # rubocop:disable Gitlab/ModuleWithInstanceVariables
log_error(issuable_class, issuable_id) unless @issuable # rubocop:disable Gitlab/ModuleWithInstanceVariables
end
- # rubocop: enable CodeReuse/ActiveRecord
def log_error(record_class, record_id)
Gitlab::AppLogger.error("#{self.class}: couldn't find #{record_class} with ID=#{record_id}, skipping job")
diff --git a/app/workers/concerns/todos_destroyer_queue.rb b/app/workers/concerns/todos_destroyer_queue.rb
index 1bbccbfb1f9..1c31b64ad97 100644
--- a/app/workers/concerns/todos_destroyer_queue.rb
+++ b/app/workers/concerns/todos_destroyer_queue.rb
@@ -8,6 +8,6 @@ module TodosDestroyerQueue
included do
queue_namespace :todos_destroyer
- feature_category :issue_tracking
+ feature_category :team_planning
end
end