summaryrefslogtreecommitdiff
path: root/app/workers
diff options
context:
space:
mode:
authorSean McGivern <sean@mcgivern.me.uk>2017-12-14 10:15:52 +0000
committerSean McGivern <sean@mcgivern.me.uk>2017-12-14 10:15:52 +0000
commit5478fe4288b6f7d4e29caa643a12248376a3e4b9 (patch)
tree3bd77e516df470f110df51aaa45c72e9921669a6 /app/workers
parentaf110b89971752bb25c59f4853dd8633fda1ed1e (diff)
parent4a6ba82b28b0219a13dcb8732361193e9c9b3890 (diff)
downloadgitlab-ce-5478fe4288b6f7d4e29caa643a12248376a3e4b9.tar.gz
Merge branch 'dm-dedicated-sidekiq-queues' into 'master'
Use a dedicated queue for each Sidekiq worker See merge request gitlab-org/gitlab-ce!15882
Diffstat (limited to 'app/workers')
-rw-r--r--app/workers/all_queues.yml98
-rw-r--r--app/workers/build_finished_worker.rb2
-rw-r--r--app/workers/build_hooks_worker.rb2
-rw-r--r--app/workers/build_queue_worker.rb2
-rw-r--r--app/workers/build_success_worker.rb2
-rw-r--r--app/workers/concerns/application_worker.rb24
-rw-r--r--app/workers/concerns/cluster_queue.rb2
-rw-r--r--app/workers/concerns/cronjob_queue.rb3
-rw-r--r--app/workers/concerns/gitlab/github_import/queue.rb4
-rw-r--r--app/workers/concerns/pipeline_queue.rb10
-rw-r--r--app/workers/concerns/repository_check_queue.rb4
-rw-r--r--app/workers/create_pipeline_worker.rb2
-rw-r--r--app/workers/expire_job_cache_worker.rb2
-rw-r--r--app/workers/expire_pipeline_cache_worker.rb2
-rw-r--r--app/workers/gitlab/github_import/advance_stage_worker.rb2
-rw-r--r--app/workers/pages_worker.rb2
-rw-r--r--app/workers/pipeline_hooks_worker.rb2
-rw-r--r--app/workers/pipeline_process_worker.rb2
-rw-r--r--app/workers/pipeline_success_worker.rb2
-rw-r--r--app/workers/pipeline_update_worker.rb2
-rw-r--r--app/workers/stage_update_worker.rb2
-rw-r--r--app/workers/update_head_pipeline_for_merge_request_worker.rb3
22 files changed, 145 insertions, 31 deletions
diff --git a/app/workers/all_queues.yml b/app/workers/all_queues.yml
new file mode 100644
index 00000000000..ba31a5aa9c2
--- /dev/null
+++ b/app/workers/all_queues.yml
@@ -0,0 +1,98 @@
+---
+- cronjob:admin_email
+- cronjob:expire_build_artifacts
+- cronjob:gitlab_usage_ping
+- cronjob:import_export_project_cleanup
+- cronjob:pipeline_schedule
+- cronjob:prune_old_events
+- cronjob:remove_expired_group_links
+- cronjob:remove_expired_members
+- cronjob:remove_old_web_hook_logs
+- cronjob:remove_unreferenced_lfs_objects
+- cronjob:repository_archive_cache
+- cronjob:repository_check_batch
+- cronjob:requests_profiles
+- cronjob:schedule_update_user_activity
+- cronjob:stuck_ci_jobs
+- cronjob:stuck_import_jobs
+- cronjob:stuck_merge_jobs
+- cronjob:trending_projects
+
+- gcp_cluster:cluster_install_app
+- gcp_cluster:cluster_provision
+- gcp_cluster:cluster_wait_for_app_installation
+- gcp_cluster:wait_for_cluster_creation
+
+- github_import_advance_stage
+- github_importer:github_import_import_diff_note
+- github_importer:github_import_import_issue
+- github_importer:github_import_import_note
+- github_importer:github_import_import_pull_request
+- github_importer:github_import_refresh_import_jid
+- github_importer:github_import_stage_finish_import
+- github_importer:github_import_stage_import_base_data
+- github_importer:github_import_stage_import_issues_and_diff_notes
+- github_importer:github_import_stage_import_notes
+- github_importer:github_import_stage_import_pull_requests
+- github_importer:github_import_stage_import_repository
+
+- pipeline_cache:expire_job_cache
+- pipeline_cache:expire_pipeline_cache
+- pipeline_creation:create_pipeline
+- pipeline_default:build_coverage
+- pipeline_default:build_trace_sections
+- pipeline_default:pipeline_metrics
+- pipeline_default:pipeline_notification
+- pipeline_default:update_head_pipeline_for_merge_request
+- pipeline_hooks:build_hooks
+- pipeline_hooks:pipeline_hooks
+- pipeline_processing:build_finished
+- pipeline_processing:build_queue
+- pipeline_processing:build_success
+- pipeline_processing:pipeline_process
+- pipeline_processing:pipeline_success
+- pipeline_processing:pipeline_update
+- pipeline_processing:stage_update
+
+- repository_check:repository_check_clear
+- repository_check:repository_check_single_repository
+
+- default
+- mailers # ActionMailer::DeliveryJob.queue_name
+
+- authorized_projects
+- background_migration
+- create_gpg_signature
+- delete_merged_branches
+- delete_user
+- email_receiver
+- emails_on_push
+- expire_build_instance_artifacts
+- git_garbage_collect
+- gitlab_shell
+- group_destroy
+- invalid_gpg_signature_update
+- irker
+- merge
+- namespaceless_project_destroy
+- new_issue
+- new_merge_request
+- new_note
+- pages
+- post_receive
+- process_commit
+- project_cache
+- project_destroy
+- project_export
+- project_migrate_hashed_storage
+- project_service
+- propagate_service_template
+- reactive_caching
+- repository_fork
+- repository_import
+- storage_migrator
+- system_hook_push
+- update_merge_requests
+- update_user_activity
+- upload_checksum
+- web_hook
diff --git a/app/workers/build_finished_worker.rb b/app/workers/build_finished_worker.rb
index 5efa9180f5e..97d80305bec 100644
--- a/app/workers/build_finished_worker.rb
+++ b/app/workers/build_finished_worker.rb
@@ -2,7 +2,7 @@ class BuildFinishedWorker
include ApplicationWorker
include PipelineQueue
- enqueue_in group: :processing
+ queue_namespace :pipeline_processing
def perform(build_id)
Ci::Build.find_by(id: build_id).try do |build|
diff --git a/app/workers/build_hooks_worker.rb b/app/workers/build_hooks_worker.rb
index 6705a1c2709..cbfca8c342c 100644
--- a/app/workers/build_hooks_worker.rb
+++ b/app/workers/build_hooks_worker.rb
@@ -2,7 +2,7 @@ class BuildHooksWorker
include ApplicationWorker
include PipelineQueue
- enqueue_in group: :hooks
+ queue_namespace :pipeline_hooks
def perform(build_id)
Ci::Build.find_by(id: build_id)
diff --git a/app/workers/build_queue_worker.rb b/app/workers/build_queue_worker.rb
index fc775a84dc0..e4f4e6c1d9e 100644
--- a/app/workers/build_queue_worker.rb
+++ b/app/workers/build_queue_worker.rb
@@ -2,7 +2,7 @@ class BuildQueueWorker
include ApplicationWorker
include PipelineQueue
- enqueue_in group: :processing
+ queue_namespace :pipeline_processing
def perform(build_id)
Ci::Build.find_by(id: build_id).try do |build|
diff --git a/app/workers/build_success_worker.rb b/app/workers/build_success_worker.rb
index ec049821ad7..4b9097bc5e4 100644
--- a/app/workers/build_success_worker.rb
+++ b/app/workers/build_success_worker.rb
@@ -2,7 +2,7 @@ class BuildSuccessWorker
include ApplicationWorker
include PipelineQueue
- enqueue_in group: :processing
+ queue_namespace :pipeline_processing
def perform(build_id)
Ci::Build.find_by(id: build_id).try do |build|
diff --git a/app/workers/concerns/application_worker.rb b/app/workers/concerns/application_worker.rb
index 9c3bdabc49e..37586e161c9 100644
--- a/app/workers/concerns/application_worker.rb
+++ b/app/workers/concerns/application_worker.rb
@@ -3,13 +3,23 @@ Sidekiq::Worker.extend ActiveSupport::Concern
module ApplicationWorker
extend ActiveSupport::Concern
- include Sidekiq::Worker
+ include Sidekiq::Worker # rubocop:disable Cop/IncludeSidekiqWorker
included do
- sidekiq_options queue: base_queue_name
+ set_queue
end
module ClassMethods
+ def inherited(subclass)
+ subclass.set_queue
+ end
+
+ def set_queue
+ queue_name = [queue_namespace, base_queue_name].compact.join(':')
+
+ sidekiq_options queue: queue_name # rubocop:disable Cop/SidekiqOptionsQueue
+ end
+
def base_queue_name
name
.sub(/\AGitlab::/, '')
@@ -18,6 +28,16 @@ module ApplicationWorker
.tr('/', '_')
end
+ def queue_namespace(new_namespace = nil)
+ if new_namespace
+ sidekiq_options queue_namespace: new_namespace
+
+ set_queue
+ else
+ get_sidekiq_options['queue_namespace']&.to_s
+ end
+ end
+
def queue
get_sidekiq_options['queue'].to_s
end
diff --git a/app/workers/concerns/cluster_queue.rb b/app/workers/concerns/cluster_queue.rb
index a5074d13220..24b9f145220 100644
--- a/app/workers/concerns/cluster_queue.rb
+++ b/app/workers/concerns/cluster_queue.rb
@@ -5,6 +5,6 @@ module ClusterQueue
extend ActiveSupport::Concern
included do
- sidekiq_options queue: :gcp_cluster
+ queue_namespace :gcp_cluster
end
end
diff --git a/app/workers/concerns/cronjob_queue.rb b/app/workers/concerns/cronjob_queue.rb
index e918bb011e0..b6581779f6a 100644
--- a/app/workers/concerns/cronjob_queue.rb
+++ b/app/workers/concerns/cronjob_queue.rb
@@ -4,6 +4,7 @@ module CronjobQueue
extend ActiveSupport::Concern
included do
- sidekiq_options queue: :cronjob, retry: false
+ queue_namespace :cronjob
+ sidekiq_options retry: false
end
end
diff --git a/app/workers/concerns/gitlab/github_import/queue.rb b/app/workers/concerns/gitlab/github_import/queue.rb
index a2bee361b86..22c2ce458e8 100644
--- a/app/workers/concerns/gitlab/github_import/queue.rb
+++ b/app/workers/concerns/gitlab/github_import/queue.rb
@@ -4,12 +4,14 @@ module Gitlab
extend ActiveSupport::Concern
included do
+ queue_namespace :github_importer
+
# If a job produces an error it may block a stage from advancing
# forever. To prevent this from happening we prevent jobs from going to
# the dead queue. This does mean some resources may not be imported, but
# this is better than a project being stuck in the "import" state
# forever.
- sidekiq_options queue: 'github_importer', dead: false, retry: 5
+ sidekiq_options dead: false, retry: 5
end
end
end
diff --git a/app/workers/concerns/pipeline_queue.rb b/app/workers/concerns/pipeline_queue.rb
index ddf45b91345..e77093a6902 100644
--- a/app/workers/concerns/pipeline_queue.rb
+++ b/app/workers/concerns/pipeline_queue.rb
@@ -5,14 +5,6 @@ module PipelineQueue
extend ActiveSupport::Concern
included do
- sidekiq_options queue: 'pipeline_default'
- end
-
- class_methods do
- def enqueue_in(group:)
- raise ArgumentError, 'Unspecified queue group!' if group.empty?
-
- sidekiq_options queue: "pipeline_#{group}"
- end
+ queue_namespace :pipeline_default
end
end
diff --git a/app/workers/concerns/repository_check_queue.rb b/app/workers/concerns/repository_check_queue.rb
index a597321ccf4..43fb66c31b0 100644
--- a/app/workers/concerns/repository_check_queue.rb
+++ b/app/workers/concerns/repository_check_queue.rb
@@ -3,6 +3,8 @@ module RepositoryCheckQueue
extend ActiveSupport::Concern
included do
- sidekiq_options queue: :repository_check, retry: false
+ queue_namespace :repository_check
+
+ sidekiq_options retry: false
end
end
diff --git a/app/workers/create_pipeline_worker.rb b/app/workers/create_pipeline_worker.rb
index 00cd7b85b9f..c3ac35e54f5 100644
--- a/app/workers/create_pipeline_worker.rb
+++ b/app/workers/create_pipeline_worker.rb
@@ -2,7 +2,7 @@ class CreatePipelineWorker
include ApplicationWorker
include PipelineQueue
- enqueue_in group: :creation
+ queue_namespace :pipeline_creation
def perform(project_id, user_id, ref, source, params = {})
project = Project.find(project_id)
diff --git a/app/workers/expire_job_cache_worker.rb b/app/workers/expire_job_cache_worker.rb
index a591e2da519..7217364a9f2 100644
--- a/app/workers/expire_job_cache_worker.rb
+++ b/app/workers/expire_job_cache_worker.rb
@@ -2,7 +2,7 @@ class ExpireJobCacheWorker
include ApplicationWorker
include PipelineQueue
- enqueue_in group: :cache
+ queue_namespace :pipeline_cache
def perform(job_id)
job = CommitStatus.joins(:pipeline, :project).find_by(id: job_id)
diff --git a/app/workers/expire_pipeline_cache_worker.rb b/app/workers/expire_pipeline_cache_worker.rb
index a3ac32b437d..3e34de22c19 100644
--- a/app/workers/expire_pipeline_cache_worker.rb
+++ b/app/workers/expire_pipeline_cache_worker.rb
@@ -2,7 +2,7 @@ class ExpirePipelineCacheWorker
include ApplicationWorker
include PipelineQueue
- enqueue_in group: :cache
+ queue_namespace :pipeline_cache
def perform(pipeline_id)
pipeline = Ci::Pipeline.find_by(id: pipeline_id)
diff --git a/app/workers/gitlab/github_import/advance_stage_worker.rb b/app/workers/gitlab/github_import/advance_stage_worker.rb
index 400396d5755..f7f498af840 100644
--- a/app/workers/gitlab/github_import/advance_stage_worker.rb
+++ b/app/workers/gitlab/github_import/advance_stage_worker.rb
@@ -9,7 +9,7 @@ module Gitlab
class AdvanceStageWorker
include ApplicationWorker
- sidekiq_options queue: 'github_importer_advance_stage', dead: false
+ sidekiq_options dead: false
INTERVAL = 30.seconds.to_i
diff --git a/app/workers/pages_worker.rb b/app/workers/pages_worker.rb
index 62f733c02fc..3ec81d040b4 100644
--- a/app/workers/pages_worker.rb
+++ b/app/workers/pages_worker.rb
@@ -1,7 +1,7 @@
class PagesWorker
include ApplicationWorker
- sidekiq_options queue: :pages, retry: false
+ sidekiq_options retry: false
def perform(action, *arg)
send(action, *arg) # rubocop:disable GitlabSecurity/PublicSend
diff --git a/app/workers/pipeline_hooks_worker.rb b/app/workers/pipeline_hooks_worker.rb
index 661c29efe88..c94918ff4ee 100644
--- a/app/workers/pipeline_hooks_worker.rb
+++ b/app/workers/pipeline_hooks_worker.rb
@@ -2,7 +2,7 @@ class PipelineHooksWorker
include ApplicationWorker
include PipelineQueue
- enqueue_in group: :hooks
+ queue_namespace :pipeline_hooks
def perform(pipeline_id)
Ci::Pipeline.find_by(id: pipeline_id)
diff --git a/app/workers/pipeline_process_worker.rb b/app/workers/pipeline_process_worker.rb
index 07dbf6a971e..24424b3f472 100644
--- a/app/workers/pipeline_process_worker.rb
+++ b/app/workers/pipeline_process_worker.rb
@@ -2,7 +2,7 @@ class PipelineProcessWorker
include ApplicationWorker
include PipelineQueue
- enqueue_in group: :processing
+ queue_namespace :pipeline_processing
def perform(pipeline_id)
Ci::Pipeline.find_by(id: pipeline_id)
diff --git a/app/workers/pipeline_success_worker.rb b/app/workers/pipeline_success_worker.rb
index 68c40a259e1..2ab0739a17f 100644
--- a/app/workers/pipeline_success_worker.rb
+++ b/app/workers/pipeline_success_worker.rb
@@ -2,7 +2,7 @@ class PipelineSuccessWorker
include ApplicationWorker
include PipelineQueue
- enqueue_in group: :processing
+ queue_namespace :pipeline_processing
def perform(pipeline_id)
Ci::Pipeline.find_by(id: pipeline_id).try do |pipeline|
diff --git a/app/workers/pipeline_update_worker.rb b/app/workers/pipeline_update_worker.rb
index 24a8a9fbed5..fc9da2d45b1 100644
--- a/app/workers/pipeline_update_worker.rb
+++ b/app/workers/pipeline_update_worker.rb
@@ -2,7 +2,7 @@ class PipelineUpdateWorker
include ApplicationWorker
include PipelineQueue
- enqueue_in group: :processing
+ queue_namespace :pipeline_processing
def perform(pipeline_id)
Ci::Pipeline.find_by(id: pipeline_id)
diff --git a/app/workers/stage_update_worker.rb b/app/workers/stage_update_worker.rb
index 69f2318d83b..e4b683fca33 100644
--- a/app/workers/stage_update_worker.rb
+++ b/app/workers/stage_update_worker.rb
@@ -2,7 +2,7 @@ class StageUpdateWorker
include ApplicationWorker
include PipelineQueue
- enqueue_in group: :processing
+ queue_namespace :pipeline_processing
def perform(stage_id)
Ci::Stage.find_by(id: stage_id).try do |stage|
diff --git a/app/workers/update_head_pipeline_for_merge_request_worker.rb b/app/workers/update_head_pipeline_for_merge_request_worker.rb
index 68c71a2b7a7..f09d89aa170 100644
--- a/app/workers/update_head_pipeline_for_merge_request_worker.rb
+++ b/app/workers/update_head_pipeline_for_merge_request_worker.rb
@@ -1,7 +1,6 @@
class UpdateHeadPipelineForMergeRequestWorker
include ApplicationWorker
-
- sidekiq_options queue: 'pipeline_default'
+ include PipelineQueue
def perform(merge_request_id)
merge_request = MergeRequest.find(merge_request_id)