summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSean McGivern <sean@mcgivern.me.uk>2017-12-14 10:15:52 +0000
committerSean McGivern <sean@mcgivern.me.uk>2017-12-14 10:15:52 +0000
commit5478fe4288b6f7d4e29caa643a12248376a3e4b9 (patch)
tree3bd77e516df470f110df51aaa45c72e9921669a6
parentaf110b89971752bb25c59f4853dd8633fda1ed1e (diff)
parent4a6ba82b28b0219a13dcb8732361193e9c9b3890 (diff)
downloadgitlab-ce-5478fe4288b6f7d4e29caa643a12248376a3e4b9.tar.gz
Merge branch 'dm-dedicated-sidekiq-queues' into 'master'
Use a dedicated queue for each Sidekiq worker See merge request gitlab-org/gitlab-ce!15882
-rw-r--r--app/workers/all_queues.yml98
-rw-r--r--app/workers/build_finished_worker.rb2
-rw-r--r--app/workers/build_hooks_worker.rb2
-rw-r--r--app/workers/build_queue_worker.rb2
-rw-r--r--app/workers/build_success_worker.rb2
-rw-r--r--app/workers/concerns/application_worker.rb24
-rw-r--r--app/workers/concerns/cluster_queue.rb2
-rw-r--r--app/workers/concerns/cronjob_queue.rb3
-rw-r--r--app/workers/concerns/gitlab/github_import/queue.rb4
-rw-r--r--app/workers/concerns/pipeline_queue.rb10
-rw-r--r--app/workers/concerns/repository_check_queue.rb4
-rw-r--r--app/workers/create_pipeline_worker.rb2
-rw-r--r--app/workers/expire_job_cache_worker.rb2
-rw-r--r--app/workers/expire_pipeline_cache_worker.rb2
-rw-r--r--app/workers/gitlab/github_import/advance_stage_worker.rb2
-rw-r--r--app/workers/pages_worker.rb2
-rw-r--r--app/workers/pipeline_hooks_worker.rb2
-rw-r--r--app/workers/pipeline_process_worker.rb2
-rw-r--r--app/workers/pipeline_success_worker.rb2
-rw-r--r--app/workers/pipeline_update_worker.rb2
-rw-r--r--app/workers/stage_update_worker.rb2
-rw-r--r--app/workers/update_head_pipeline_for_merge_request_worker.rb3
-rw-r--r--config/initializers/sidekiq.rb18
-rw-r--r--config/sidekiq_queues.yml5
-rw-r--r--db/post_migrate/20171213160445_migrate_github_importer_advance_stage_sidekiq_queue.rb16
-rw-r--r--db/schema.rb2
-rw-r--r--doc/development/sidekiq_style_guide.md59
-rw-r--r--lib/gitlab/sidekiq_config.rb37
-rw-r--r--lib/gitlab/sidekiq_versioning.rb25
-rw-r--r--lib/gitlab/sidekiq_versioning/manager.rb12
-rw-r--r--rubocop/cop/include_sidekiq_worker.rb29
-rw-r--r--rubocop/cop/sidekiq_options_queue.rb27
-rw-r--r--rubocop/rubocop.rb2
-rw-r--r--spec/lib/gitlab/sidekiq_config_spec.rb23
-rw-r--r--spec/lib/gitlab/sidekiq_versioning/manager_spec.rb22
-rw-r--r--spec/lib/gitlab/sidekiq_versioning_spec.rb44
-rw-r--r--spec/rubocop/cop/include_sidekiq_worker_spec.rb31
-rw-r--r--spec/rubocop/cop/sidekiq_options_queue_spec.rb26
-rw-r--r--spec/workers/concerns/application_worker_spec.rb8
-rw-r--r--spec/workers/concerns/cluster_queue_spec.rb2
-rw-r--r--spec/workers/concerns/cronjob_queue_spec.rb2
-rw-r--r--spec/workers/concerns/gitlab/github_import/queue_spec.rb2
-rw-r--r--spec/workers/concerns/pipeline_queue_spec.rb11
-rw-r--r--spec/workers/concerns/repository_check_queue_spec.rb2
-rw-r--r--spec/workers/every_sidekiq_worker_spec.rb31
45 files changed, 510 insertions, 102 deletions
diff --git a/app/workers/all_queues.yml b/app/workers/all_queues.yml
new file mode 100644
index 00000000000..ba31a5aa9c2
--- /dev/null
+++ b/app/workers/all_queues.yml
@@ -0,0 +1,98 @@
+---
+- cronjob:admin_email
+- cronjob:expire_build_artifacts
+- cronjob:gitlab_usage_ping
+- cronjob:import_export_project_cleanup
+- cronjob:pipeline_schedule
+- cronjob:prune_old_events
+- cronjob:remove_expired_group_links
+- cronjob:remove_expired_members
+- cronjob:remove_old_web_hook_logs
+- cronjob:remove_unreferenced_lfs_objects
+- cronjob:repository_archive_cache
+- cronjob:repository_check_batch
+- cronjob:requests_profiles
+- cronjob:schedule_update_user_activity
+- cronjob:stuck_ci_jobs
+- cronjob:stuck_import_jobs
+- cronjob:stuck_merge_jobs
+- cronjob:trending_projects
+
+- gcp_cluster:cluster_install_app
+- gcp_cluster:cluster_provision
+- gcp_cluster:cluster_wait_for_app_installation
+- gcp_cluster:wait_for_cluster_creation
+
+- github_import_advance_stage
+- github_importer:github_import_import_diff_note
+- github_importer:github_import_import_issue
+- github_importer:github_import_import_note
+- github_importer:github_import_import_pull_request
+- github_importer:github_import_refresh_import_jid
+- github_importer:github_import_stage_finish_import
+- github_importer:github_import_stage_import_base_data
+- github_importer:github_import_stage_import_issues_and_diff_notes
+- github_importer:github_import_stage_import_notes
+- github_importer:github_import_stage_import_pull_requests
+- github_importer:github_import_stage_import_repository
+
+- pipeline_cache:expire_job_cache
+- pipeline_cache:expire_pipeline_cache
+- pipeline_creation:create_pipeline
+- pipeline_default:build_coverage
+- pipeline_default:build_trace_sections
+- pipeline_default:pipeline_metrics
+- pipeline_default:pipeline_notification
+- pipeline_default:update_head_pipeline_for_merge_request
+- pipeline_hooks:build_hooks
+- pipeline_hooks:pipeline_hooks
+- pipeline_processing:build_finished
+- pipeline_processing:build_queue
+- pipeline_processing:build_success
+- pipeline_processing:pipeline_process
+- pipeline_processing:pipeline_success
+- pipeline_processing:pipeline_update
+- pipeline_processing:stage_update
+
+- repository_check:repository_check_clear
+- repository_check:repository_check_single_repository
+
+- default
+- mailers # ActionMailer::DeliveryJob.queue_name
+
+- authorized_projects
+- background_migration
+- create_gpg_signature
+- delete_merged_branches
+- delete_user
+- email_receiver
+- emails_on_push
+- expire_build_instance_artifacts
+- git_garbage_collect
+- gitlab_shell
+- group_destroy
+- invalid_gpg_signature_update
+- irker
+- merge
+- namespaceless_project_destroy
+- new_issue
+- new_merge_request
+- new_note
+- pages
+- post_receive
+- process_commit
+- project_cache
+- project_destroy
+- project_export
+- project_migrate_hashed_storage
+- project_service
+- propagate_service_template
+- reactive_caching
+- repository_fork
+- repository_import
+- storage_migrator
+- system_hook_push
+- update_merge_requests
+- update_user_activity
+- upload_checksum
+- web_hook
diff --git a/app/workers/build_finished_worker.rb b/app/workers/build_finished_worker.rb
index 5efa9180f5e..97d80305bec 100644
--- a/app/workers/build_finished_worker.rb
+++ b/app/workers/build_finished_worker.rb
@@ -2,7 +2,7 @@ class BuildFinishedWorker
include ApplicationWorker
include PipelineQueue
- enqueue_in group: :processing
+ queue_namespace :pipeline_processing
def perform(build_id)
Ci::Build.find_by(id: build_id).try do |build|
diff --git a/app/workers/build_hooks_worker.rb b/app/workers/build_hooks_worker.rb
index 6705a1c2709..cbfca8c342c 100644
--- a/app/workers/build_hooks_worker.rb
+++ b/app/workers/build_hooks_worker.rb
@@ -2,7 +2,7 @@ class BuildHooksWorker
include ApplicationWorker
include PipelineQueue
- enqueue_in group: :hooks
+ queue_namespace :pipeline_hooks
def perform(build_id)
Ci::Build.find_by(id: build_id)
diff --git a/app/workers/build_queue_worker.rb b/app/workers/build_queue_worker.rb
index fc775a84dc0..e4f4e6c1d9e 100644
--- a/app/workers/build_queue_worker.rb
+++ b/app/workers/build_queue_worker.rb
@@ -2,7 +2,7 @@ class BuildQueueWorker
include ApplicationWorker
include PipelineQueue
- enqueue_in group: :processing
+ queue_namespace :pipeline_processing
def perform(build_id)
Ci::Build.find_by(id: build_id).try do |build|
diff --git a/app/workers/build_success_worker.rb b/app/workers/build_success_worker.rb
index ec049821ad7..4b9097bc5e4 100644
--- a/app/workers/build_success_worker.rb
+++ b/app/workers/build_success_worker.rb
@@ -2,7 +2,7 @@ class BuildSuccessWorker
include ApplicationWorker
include PipelineQueue
- enqueue_in group: :processing
+ queue_namespace :pipeline_processing
def perform(build_id)
Ci::Build.find_by(id: build_id).try do |build|
diff --git a/app/workers/concerns/application_worker.rb b/app/workers/concerns/application_worker.rb
index 9c3bdabc49e..37586e161c9 100644
--- a/app/workers/concerns/application_worker.rb
+++ b/app/workers/concerns/application_worker.rb
@@ -3,13 +3,23 @@ Sidekiq::Worker.extend ActiveSupport::Concern
module ApplicationWorker
extend ActiveSupport::Concern
- include Sidekiq::Worker
+ include Sidekiq::Worker # rubocop:disable Cop/IncludeSidekiqWorker
included do
- sidekiq_options queue: base_queue_name
+ set_queue
end
module ClassMethods
+ def inherited(subclass)
+ subclass.set_queue
+ end
+
+ def set_queue
+ queue_name = [queue_namespace, base_queue_name].compact.join(':')
+
+ sidekiq_options queue: queue_name # rubocop:disable Cop/SidekiqOptionsQueue
+ end
+
def base_queue_name
name
.sub(/\AGitlab::/, '')
@@ -18,6 +28,16 @@ module ApplicationWorker
.tr('/', '_')
end
+ def queue_namespace(new_namespace = nil)
+ if new_namespace
+ sidekiq_options queue_namespace: new_namespace
+
+ set_queue
+ else
+ get_sidekiq_options['queue_namespace']&.to_s
+ end
+ end
+
def queue
get_sidekiq_options['queue'].to_s
end
diff --git a/app/workers/concerns/cluster_queue.rb b/app/workers/concerns/cluster_queue.rb
index a5074d13220..24b9f145220 100644
--- a/app/workers/concerns/cluster_queue.rb
+++ b/app/workers/concerns/cluster_queue.rb
@@ -5,6 +5,6 @@ module ClusterQueue
extend ActiveSupport::Concern
included do
- sidekiq_options queue: :gcp_cluster
+ queue_namespace :gcp_cluster
end
end
diff --git a/app/workers/concerns/cronjob_queue.rb b/app/workers/concerns/cronjob_queue.rb
index e918bb011e0..b6581779f6a 100644
--- a/app/workers/concerns/cronjob_queue.rb
+++ b/app/workers/concerns/cronjob_queue.rb
@@ -4,6 +4,7 @@ module CronjobQueue
extend ActiveSupport::Concern
included do
- sidekiq_options queue: :cronjob, retry: false
+ queue_namespace :cronjob
+ sidekiq_options retry: false
end
end
diff --git a/app/workers/concerns/gitlab/github_import/queue.rb b/app/workers/concerns/gitlab/github_import/queue.rb
index a2bee361b86..22c2ce458e8 100644
--- a/app/workers/concerns/gitlab/github_import/queue.rb
+++ b/app/workers/concerns/gitlab/github_import/queue.rb
@@ -4,12 +4,14 @@ module Gitlab
extend ActiveSupport::Concern
included do
+ queue_namespace :github_importer
+
# If a job produces an error it may block a stage from advancing
# forever. To prevent this from happening we prevent jobs from going to
# the dead queue. This does mean some resources may not be imported, but
# this is better than a project being stuck in the "import" state
# forever.
- sidekiq_options queue: 'github_importer', dead: false, retry: 5
+ sidekiq_options dead: false, retry: 5
end
end
end
diff --git a/app/workers/concerns/pipeline_queue.rb b/app/workers/concerns/pipeline_queue.rb
index ddf45b91345..e77093a6902 100644
--- a/app/workers/concerns/pipeline_queue.rb
+++ b/app/workers/concerns/pipeline_queue.rb
@@ -5,14 +5,6 @@ module PipelineQueue
extend ActiveSupport::Concern
included do
- sidekiq_options queue: 'pipeline_default'
- end
-
- class_methods do
- def enqueue_in(group:)
- raise ArgumentError, 'Unspecified queue group!' if group.empty?
-
- sidekiq_options queue: "pipeline_#{group}"
- end
+ queue_namespace :pipeline_default
end
end
diff --git a/app/workers/concerns/repository_check_queue.rb b/app/workers/concerns/repository_check_queue.rb
index a597321ccf4..43fb66c31b0 100644
--- a/app/workers/concerns/repository_check_queue.rb
+++ b/app/workers/concerns/repository_check_queue.rb
@@ -3,6 +3,8 @@ module RepositoryCheckQueue
extend ActiveSupport::Concern
included do
- sidekiq_options queue: :repository_check, retry: false
+ queue_namespace :repository_check
+
+ sidekiq_options retry: false
end
end
diff --git a/app/workers/create_pipeline_worker.rb b/app/workers/create_pipeline_worker.rb
index 00cd7b85b9f..c3ac35e54f5 100644
--- a/app/workers/create_pipeline_worker.rb
+++ b/app/workers/create_pipeline_worker.rb
@@ -2,7 +2,7 @@ class CreatePipelineWorker
include ApplicationWorker
include PipelineQueue
- enqueue_in group: :creation
+ queue_namespace :pipeline_creation
def perform(project_id, user_id, ref, source, params = {})
project = Project.find(project_id)
diff --git a/app/workers/expire_job_cache_worker.rb b/app/workers/expire_job_cache_worker.rb
index a591e2da519..7217364a9f2 100644
--- a/app/workers/expire_job_cache_worker.rb
+++ b/app/workers/expire_job_cache_worker.rb
@@ -2,7 +2,7 @@ class ExpireJobCacheWorker
include ApplicationWorker
include PipelineQueue
- enqueue_in group: :cache
+ queue_namespace :pipeline_cache
def perform(job_id)
job = CommitStatus.joins(:pipeline, :project).find_by(id: job_id)
diff --git a/app/workers/expire_pipeline_cache_worker.rb b/app/workers/expire_pipeline_cache_worker.rb
index a3ac32b437d..3e34de22c19 100644
--- a/app/workers/expire_pipeline_cache_worker.rb
+++ b/app/workers/expire_pipeline_cache_worker.rb
@@ -2,7 +2,7 @@ class ExpirePipelineCacheWorker
include ApplicationWorker
include PipelineQueue
- enqueue_in group: :cache
+ queue_namespace :pipeline_cache
def perform(pipeline_id)
pipeline = Ci::Pipeline.find_by(id: pipeline_id)
diff --git a/app/workers/gitlab/github_import/advance_stage_worker.rb b/app/workers/gitlab/github_import/advance_stage_worker.rb
index 400396d5755..f7f498af840 100644
--- a/app/workers/gitlab/github_import/advance_stage_worker.rb
+++ b/app/workers/gitlab/github_import/advance_stage_worker.rb
@@ -9,7 +9,7 @@ module Gitlab
class AdvanceStageWorker
include ApplicationWorker
- sidekiq_options queue: 'github_importer_advance_stage', dead: false
+ sidekiq_options dead: false
INTERVAL = 30.seconds.to_i
diff --git a/app/workers/pages_worker.rb b/app/workers/pages_worker.rb
index 62f733c02fc..3ec81d040b4 100644
--- a/app/workers/pages_worker.rb
+++ b/app/workers/pages_worker.rb
@@ -1,7 +1,7 @@
class PagesWorker
include ApplicationWorker
- sidekiq_options queue: :pages, retry: false
+ sidekiq_options retry: false
def perform(action, *arg)
send(action, *arg) # rubocop:disable GitlabSecurity/PublicSend
diff --git a/app/workers/pipeline_hooks_worker.rb b/app/workers/pipeline_hooks_worker.rb
index 661c29efe88..c94918ff4ee 100644
--- a/app/workers/pipeline_hooks_worker.rb
+++ b/app/workers/pipeline_hooks_worker.rb
@@ -2,7 +2,7 @@ class PipelineHooksWorker
include ApplicationWorker
include PipelineQueue
- enqueue_in group: :hooks
+ queue_namespace :pipeline_hooks
def perform(pipeline_id)
Ci::Pipeline.find_by(id: pipeline_id)
diff --git a/app/workers/pipeline_process_worker.rb b/app/workers/pipeline_process_worker.rb
index 07dbf6a971e..24424b3f472 100644
--- a/app/workers/pipeline_process_worker.rb
+++ b/app/workers/pipeline_process_worker.rb
@@ -2,7 +2,7 @@ class PipelineProcessWorker
include ApplicationWorker
include PipelineQueue
- enqueue_in group: :processing
+ queue_namespace :pipeline_processing
def perform(pipeline_id)
Ci::Pipeline.find_by(id: pipeline_id)
diff --git a/app/workers/pipeline_success_worker.rb b/app/workers/pipeline_success_worker.rb
index 68c40a259e1..2ab0739a17f 100644
--- a/app/workers/pipeline_success_worker.rb
+++ b/app/workers/pipeline_success_worker.rb
@@ -2,7 +2,7 @@ class PipelineSuccessWorker
include ApplicationWorker
include PipelineQueue
- enqueue_in group: :processing
+ queue_namespace :pipeline_processing
def perform(pipeline_id)
Ci::Pipeline.find_by(id: pipeline_id).try do |pipeline|
diff --git a/app/workers/pipeline_update_worker.rb b/app/workers/pipeline_update_worker.rb
index 24a8a9fbed5..fc9da2d45b1 100644
--- a/app/workers/pipeline_update_worker.rb
+++ b/app/workers/pipeline_update_worker.rb
@@ -2,7 +2,7 @@ class PipelineUpdateWorker
include ApplicationWorker
include PipelineQueue
- enqueue_in group: :processing
+ queue_namespace :pipeline_processing
def perform(pipeline_id)
Ci::Pipeline.find_by(id: pipeline_id)
diff --git a/app/workers/stage_update_worker.rb b/app/workers/stage_update_worker.rb
index 69f2318d83b..e4b683fca33 100644
--- a/app/workers/stage_update_worker.rb
+++ b/app/workers/stage_update_worker.rb
@@ -2,7 +2,7 @@ class StageUpdateWorker
include ApplicationWorker
include PipelineQueue
- enqueue_in group: :processing
+ queue_namespace :pipeline_processing
def perform(stage_id)
Ci::Stage.find_by(id: stage_id).try do |stage|
diff --git a/app/workers/update_head_pipeline_for_merge_request_worker.rb b/app/workers/update_head_pipeline_for_merge_request_worker.rb
index 68c71a2b7a7..f09d89aa170 100644
--- a/app/workers/update_head_pipeline_for_merge_request_worker.rb
+++ b/app/workers/update_head_pipeline_for_merge_request_worker.rb
@@ -1,7 +1,6 @@
class UpdateHeadPipelineForMergeRequestWorker
include ApplicationWorker
-
- sidekiq_options queue: 'pipeline_default'
+ include PipelineQueue
def perform(merge_request_id)
merge_request = MergeRequest.find(merge_request_id)
diff --git a/config/initializers/sidekiq.rb b/config/initializers/sidekiq.rb
index ba4481ae602..0f164e628f9 100644
--- a/config/initializers/sidekiq.rb
+++ b/config/initializers/sidekiq.rb
@@ -42,6 +42,8 @@ Sidekiq.configure_server do |config|
Gitlab::SidekiqThrottler.execute!
+ Gitlab::SidekiqVersioning.install!
+
config = Gitlab::Database.config ||
Rails.application.config.database_configuration[Rails.env]
config['pool'] = Sidekiq.options[:concurrency]
@@ -60,19 +62,3 @@ Sidekiq.configure_client do |config|
chain.add Gitlab::SidekiqStatus::ClientMiddleware
end
end
-
-# The Sidekiq client API always adds the queue to the Sidekiq queue
-# list, but mail_room and gitlab-shell do not. This is only necessary
-# for monitoring.
-begin
- queues = Gitlab::SidekiqConfig.worker_queues
-
- Sidekiq.redis do |conn|
- conn.pipelined do
- queues.each do |queue|
- conn.sadd('queues', queue)
- end
- end
- end
-rescue Redis::BaseError, SocketError, Errno::ENOENT, Errno::EADDRNOTAVAIL, Errno::EAFNOSUPPORT, Errno::ECONNRESET, Errno::ECONNREFUSED
-end
diff --git a/config/sidekiq_queues.yml b/config/sidekiq_queues.yml
index bc7c431731a..31a38f2b508 100644
--- a/config/sidekiq_queues.yml
+++ b/config/sidekiq_queues.yml
@@ -25,8 +25,6 @@
- [new_note, 2]
- [new_issue, 2]
- [new_merge_request, 2]
- - [build, 2]
- - [pipeline, 2]
- [pipeline_processing, 5]
- [pipeline_creation, 4]
- [pipeline_default, 3]
@@ -38,11 +36,12 @@
- [mailers, 2]
- [invalid_gpg_signature_update, 2]
- [create_gpg_signature, 2]
+ - [rebase, 2]
- [upload_checksum, 1]
- [repository_fork, 1]
- [repository_import, 1]
- [github_importer, 1]
- - [github_importer_advance_stage, 1]
+ - [github_import_advance_stage, 1]
- [project_service, 1]
- [delete_user, 1]
- [delete_merged_branches, 1]
diff --git a/db/post_migrate/20171213160445_migrate_github_importer_advance_stage_sidekiq_queue.rb b/db/post_migrate/20171213160445_migrate_github_importer_advance_stage_sidekiq_queue.rb
new file mode 100644
index 00000000000..149c28f1946
--- /dev/null
+++ b/db/post_migrate/20171213160445_migrate_github_importer_advance_stage_sidekiq_queue.rb
@@ -0,0 +1,16 @@
+# See http://doc.gitlab.com/ce/development/migration_style_guide.html
+# for more information on how to write migrations for GitLab.
+
+class MigrateGithubImporterAdvanceStageSidekiqQueue < ActiveRecord::Migration
+ include Gitlab::Database::MigrationHelpers
+
+ DOWNTIME = false
+
+ def up
+ sidekiq_queue_migrate 'github_importer_advance_stage', to: 'github_import_advance_stage'
+ end
+
+ def down
+ sidekiq_queue_migrate 'github_import_advance_stage', to: 'github_importer_advance_stage'
+ end
+end
diff --git a/db/schema.rb b/db/schema.rb
index f0b1da16d53..2048c50f892 100644
--- a/db/schema.rb
+++ b/db/schema.rb
@@ -11,7 +11,7 @@
#
# It's strongly recommended that you check this file into your version control system.
-ActiveRecord::Schema.define(version: 20171206221519) do
+ActiveRecord::Schema.define(version: 20171213160445) do
# These are extensions that must be enabled in order to support this database
enable_extension "plpgsql"
diff --git a/doc/development/sidekiq_style_guide.md b/doc/development/sidekiq_style_guide.md
index 085fb8f902c..59ebf41e09f 100644
--- a/doc/development/sidekiq_style_guide.md
+++ b/doc/development/sidekiq_style_guide.md
@@ -9,25 +9,54 @@ All workers should include `ApplicationWorker` instead of `Sidekiq::Worker`,
which adds some convenience methods and automatically sets the queue based on
the worker's name.
-## Default Queue
+## Dedicated Queues
-Use of the "default" queue is not allowed. Every worker should use a queue that
-matches the worker's purpose the closest. For example, workers that are to be
-executed periodically should use the "cronjob" queue.
+All workers should use their own queue, which is automatically set based on the
+worker class name. For a worker named `ProcessSomethingWorker`, the queue name
+would be `process_something`. If you're not sure what queue a worker uses,
+you can find it using `SomeWorker.queue`. There is almost never a reason to
+manually override the queue name using `sidekiq_options queue: :some_queue`.
-A list of all available queues can be found in `config/sidekiq_queues.yml`.
+## Queue Namespaces
-## Dedicated Queues
+While different workers cannot share a queue, they can share a queue namespace.
-Most workers should use their own queue, which is automatically set based on the
-worker class name. For a worker named `ProcessSomethingWorker`, the queue name
-would be `process_something`. If you're not sure what a worker's queue name is,
-you can find it using `SomeWorker.queue`.
+Defining a queue namespace for a worker makes it possible to start a Sidekiq
+process that automatically handles jobs for all workers in that namespace,
+without needing to explicitly list all their queue names. If, for example, all
+workers that are managed by sidekiq-cron use the `cronjob` queue namespace, we
+can spin up a Sidekiq process specifically for these kinds of scheduled jobs.
+If a new worker using the `cronjob` namespace is added later on, the Sidekiq
+process will automatically pick up jobs for that worker too (after having been
+restarted), without the need to change any configuration.
+
+A queue namespace can be set using the `queue_namespace` DSL class method:
+
+```ruby
+class SomeScheduledTaskWorker
+ include ApplicationWorker
+
+ queue_namespace :cronjob
+
+ # ...
+end
+```
+
+Behind the scenes, this will set `SomeScheduledTaskWorker.queue` to
+`cronjob:some_scheduled_task`. Commonly used namespaces will have their own
+concern module that can easily be included into the worker class, and that may
+set other Sidekiq options besides the queue namespace. `CronjobQueue`, for
+example, sets the namespace, but also disables retries.
+
+`bundle exec sidekiq` is namespace-aware, and will automatically listen on all
+queues in a namespace (technically: all queues prefixed with the namespace name)
+when a namespace is provided instead of a simple queue name in the `--queue`
+(`-q`) option, or in the `:queues:` section in `config/sidekiq_queues.yml`.
-In some cases multiple workers do use the same queue. For example, the various
-workers for updating CI pipelines all use the `pipeline` queue. Adding workers
-to existing queues should be done with care, as adding more workers can lead to
-slow jobs blocking work (even for different jobs) on the shared queue.
+Note that adding a worker to an existing namespace should be done with care, as
+the extra jobs will take resources away from jobs from workers that were already
+there, if the resources available to the Sidekiq process handling the namespace
+are not adjusted appropriately.
## Tests
@@ -36,7 +65,7 @@ tests should be placed in `spec/workers`.
## Removing or renaming queues
-Try to avoid renaming or removing queues in minor and patch releases.
+Try to avoid renaming or removing workers and their queues in minor and patch releases.
During online update instance can have pending jobs and removing the queue can
lead to those jobs being stuck forever. If you can't write migration for those
Sidekiq jobs, please consider doing rename or remove queue in major release only.
diff --git a/lib/gitlab/sidekiq_config.rb b/lib/gitlab/sidekiq_config.rb
index dc9886732b5..c3d7814551c 100644
--- a/lib/gitlab/sidekiq_config.rb
+++ b/lib/gitlab/sidekiq_config.rb
@@ -1,16 +1,35 @@
require 'yaml'
+require 'set'
module Gitlab
module SidekiqConfig
- def self.redis_queues
- @redis_queues ||= Sidekiq::Queue.all.map(&:name)
+ # This method is called by `bin/sidekiq-cluster` in EE, which runs outside
+ # of bundler/Rails context, so we cannot use any gem or Rails methods.
+ def self.worker_queues(rails_path = Rails.root.to_s)
+ @worker_queues ||= {}
+ @worker_queues[rails_path] ||= YAML.load_file(File.join(rails_path, 'app/workers/all_queues.yml'))
end
# This method is called by `bin/sidekiq-cluster` in EE, which runs outside
# of bundler/Rails context, so we cannot use any gem or Rails methods.
- def self.config_queues(rails_path = Rails.root.to_s)
+ def self.expand_queues(queues, all_queues = self.worker_queues)
+ return [] if queues.empty?
+
+ queues_set = all_queues.to_set
+
+ queues.flat_map do |queue|
+ [queue, *queues_set.grep(/\A#{queue}:/)]
+ end
+ end
+
+ def self.redis_queues
+ # Not memoized, because this can change during the life of the application
+ Sidekiq::Queue.all.map(&:name)
+ end
+
+ def self.config_queues
@config_queues ||= begin
- config = YAML.load_file(File.join(rails_path, 'config', 'sidekiq_queues.yml'))
+ config = YAML.load_file(Rails.root.join('config/sidekiq_queues.yml'))
config[:queues].map(&:first)
end
end
@@ -23,14 +42,6 @@ module Gitlab
@workers ||= find_workers(Rails.root.join('app', 'workers'))
end
- def self.default_queues
- [ActionMailer::DeliveryJob.queue_name, 'default']
- end
-
- def self.worker_queues
- @worker_queues ||= (workers.map(&:queue) + default_queues).uniq
- end
-
def self.find_workers(root)
concerns = root.join('concerns').to_s
@@ -43,7 +54,7 @@ module Gitlab
ns.camelize.constantize
end
- # Skip concerns
+ # Skip things that aren't workers
workers.select { |w| w < Sidekiq::Worker }
end
end
diff --git a/lib/gitlab/sidekiq_versioning.rb b/lib/gitlab/sidekiq_versioning.rb
new file mode 100644
index 00000000000..9683214ec18
--- /dev/null
+++ b/lib/gitlab/sidekiq_versioning.rb
@@ -0,0 +1,25 @@
+module Gitlab
+ module SidekiqVersioning
+ def self.install!
+ Sidekiq::Manager.prepend SidekiqVersioning::Manager
+
+ # The Sidekiq client API always adds the queue to the Sidekiq queue
+ # list, but mail_room and gitlab-shell do not. This is only necessary
+ # for monitoring.
+ begin
+ queues = SidekiqConfig.worker_queues
+
+ if queues.any?
+ Sidekiq.redis do |conn|
+ conn.pipelined do
+ queues.each do |queue|
+ conn.sadd('queues', queue)
+ end
+ end
+ end
+ end
+ rescue ::Redis::BaseError, SocketError, Errno::ENOENT, Errno::EADDRNOTAVAIL, Errno::EAFNOSUPPORT, Errno::ECONNRESET, Errno::ECONNREFUSED
+ end
+ end
+ end
+end
diff --git a/lib/gitlab/sidekiq_versioning/manager.rb b/lib/gitlab/sidekiq_versioning/manager.rb
new file mode 100644
index 00000000000..308be0fdf76
--- /dev/null
+++ b/lib/gitlab/sidekiq_versioning/manager.rb
@@ -0,0 +1,12 @@
+module Gitlab
+ module SidekiqVersioning
+ module Manager
+ def initialize(options = {})
+ options[:strict] = false
+ options[:queues] = SidekiqConfig.expand_queues(options[:queues])
+ Sidekiq.logger.info "Listening on queues #{options[:queues].uniq.sort}"
+ super
+ end
+ end
+ end
+end
diff --git a/rubocop/cop/include_sidekiq_worker.rb b/rubocop/cop/include_sidekiq_worker.rb
new file mode 100644
index 00000000000..4a6332286a2
--- /dev/null
+++ b/rubocop/cop/include_sidekiq_worker.rb
@@ -0,0 +1,29 @@
+require_relative '../spec_helpers'
+
+module RuboCop
+ module Cop
+ # Cop that makes sure workers include `ApplicationWorker`, not `Sidekiq::Worker`.
+ class IncludeSidekiqWorker < RuboCop::Cop::Cop
+ include SpecHelpers
+
+ MSG = 'Include `ApplicationWorker`, not `Sidekiq::Worker`.'.freeze
+
+ def_node_matcher :includes_sidekiq_worker?, <<~PATTERN
+ (send nil :include (const (const nil :Sidekiq) :Worker))
+ PATTERN
+
+ def on_send(node)
+ return if in_spec?(node)
+ return unless includes_sidekiq_worker?(node)
+
+ add_offense(node.arguments.first, :expression)
+ end
+
+ def autocorrect(node)
+ lambda do |corrector|
+ corrector.replace(node.source_range, 'ApplicationWorker')
+ end
+ end
+ end
+ end
+end
diff --git a/rubocop/cop/sidekiq_options_queue.rb b/rubocop/cop/sidekiq_options_queue.rb
new file mode 100644
index 00000000000..43b35ba0214
--- /dev/null
+++ b/rubocop/cop/sidekiq_options_queue.rb
@@ -0,0 +1,27 @@
+require_relative '../spec_helpers'
+
+module RuboCop
+ module Cop
+ # Cop that prevents manually setting a queue in Sidekiq workers.
+ class SidekiqOptionsQueue < RuboCop::Cop::Cop
+ include SpecHelpers
+
+ MSG = 'Do not manually set a queue; `ApplicationWorker` sets one automatically.'.freeze
+
+ def_node_matcher :sidekiq_options?, <<~PATTERN
+ (send nil :sidekiq_options $...)
+ PATTERN
+
+ def on_send(node)
+ return if in_spec?(node)
+ return unless sidekiq_options?(node)
+
+ node.arguments.first.each_node(:pair) do |pair|
+ key_name = pair.key.children[0]
+
+ add_offense(pair, :expression) if key_name == :queue
+ end
+ end
+ end
+ end
+end
diff --git a/rubocop/rubocop.rb b/rubocop/rubocop.rb
index eb52be3d731..3e3b4c8349a 100644
--- a/rubocop/rubocop.rb
+++ b/rubocop/rubocop.rb
@@ -3,10 +3,12 @@ require_relative 'cop/active_record_serialize'
require_relative 'cop/custom_error_class'
require_relative 'cop/gem_fetcher'
require_relative 'cop/in_batches'
+require_relative 'cop/include_sidekiq_worker'
require_relative 'cop/line_break_after_guard_clauses'
require_relative 'cop/polymorphic_associations'
require_relative 'cop/project_path_helper'
require_relative 'cop/redirect_with_status'
+require_relative 'cop/sidekiq_options_queue'
require_relative 'cop/migration/add_column'
require_relative 'cop/migration/add_concurrent_foreign_key'
require_relative 'cop/migration/add_concurrent_index'
diff --git a/spec/lib/gitlab/sidekiq_config_spec.rb b/spec/lib/gitlab/sidekiq_config_spec.rb
index 09f95be2213..0c66d764851 100644
--- a/spec/lib/gitlab/sidekiq_config_spec.rb
+++ b/spec/lib/gitlab/sidekiq_config_spec.rb
@@ -16,9 +16,30 @@ describe Gitlab::SidekiqConfig do
expect(queues).to include('post_receive')
expect(queues).to include('merge')
- expect(queues).to include('cronjob')
+ expect(queues).to include('cronjob:stuck_import_jobs')
expect(queues).to include('mailers')
expect(queues).to include('default')
end
end
+
+ describe '.expand_queues' do
+ it 'expands queue namespaces to concrete queue names' do
+ queues = described_class.expand_queues(%w[cronjob])
+
+ expect(queues).to include('cronjob:stuck_import_jobs')
+ expect(queues).to include('cronjob:stuck_merge_jobs')
+ end
+
+ it 'lets concrete queue names pass through' do
+ queues = described_class.expand_queues(%w[post_receive])
+
+ expect(queues).to include('post_receive')
+ end
+
+ it 'lets unknown queues pass through' do
+ queues = described_class.expand_queues(%w[unknown])
+
+ expect(queues).to include('unknown')
+ end
+ end
end
diff --git a/spec/lib/gitlab/sidekiq_versioning/manager_spec.rb b/spec/lib/gitlab/sidekiq_versioning/manager_spec.rb
new file mode 100644
index 00000000000..7debf70a16f
--- /dev/null
+++ b/spec/lib/gitlab/sidekiq_versioning/manager_spec.rb
@@ -0,0 +1,22 @@
+require 'spec_helper'
+
+describe Gitlab::SidekiqVersioning::Manager do
+ before do
+ Sidekiq::Manager.prepend described_class
+ end
+
+ describe '#initialize' do
+ it 'listens on all expanded queues' do
+ manager = Sidekiq::Manager.new(queues: %w[post_receive repository_fork cronjob unknown])
+
+ queues = manager.options[:queues]
+
+ expect(queues).to include('post_receive')
+ expect(queues).to include('repository_fork')
+ expect(queues).to include('cronjob')
+ expect(queues).to include('cronjob:stuck_import_jobs')
+ expect(queues).to include('cronjob:stuck_merge_jobs')
+ expect(queues).to include('unknown')
+ end
+ end
+end
diff --git a/spec/lib/gitlab/sidekiq_versioning_spec.rb b/spec/lib/gitlab/sidekiq_versioning_spec.rb
new file mode 100644
index 00000000000..fa6d42e730d
--- /dev/null
+++ b/spec/lib/gitlab/sidekiq_versioning_spec.rb
@@ -0,0 +1,44 @@
+require 'spec_helper'
+
+describe Gitlab::SidekiqVersioning, :sidekiq, :redis do
+ let(:foo_worker) do
+ Class.new do
+ def self.name
+ 'FooWorker'
+ end
+
+ include ApplicationWorker
+ end
+ end
+
+ let(:bar_worker) do
+ Class.new do
+ def self.name
+ 'BarWorker'
+ end
+
+ include ApplicationWorker
+ end
+ end
+
+ before do
+ allow(Gitlab::SidekiqConfig).to receive(:workers).and_return([foo_worker, bar_worker])
+ allow(Gitlab::SidekiqConfig).to receive(:worker_queues).and_return([foo_worker.queue, bar_worker.queue])
+ end
+
+ describe '.install!' do
+ it 'prepends SidekiqVersioning::Manager into Sidekiq::Manager' do
+ described_class.install!
+
+ expect(Sidekiq::Manager).to include(Gitlab::SidekiqVersioning::Manager)
+ end
+
+ it 'registers all versionless and versioned queues with Redis' do
+ described_class.install!
+
+ queues = Sidekiq::Queue.all.map(&:name)
+ expect(queues).to include('foo')
+ expect(queues).to include('bar')
+ end
+ end
+end
diff --git a/spec/rubocop/cop/include_sidekiq_worker_spec.rb b/spec/rubocop/cop/include_sidekiq_worker_spec.rb
new file mode 100644
index 00000000000..7f406535dda
--- /dev/null
+++ b/spec/rubocop/cop/include_sidekiq_worker_spec.rb
@@ -0,0 +1,31 @@
+require 'spec_helper'
+require 'rubocop'
+require 'rubocop/rspec/support'
+require_relative '../../../rubocop/cop/include_sidekiq_worker'
+
+describe RuboCop::Cop::IncludeSidekiqWorker do
+ include CopHelper
+
+ subject(:cop) { described_class.new }
+
+ context 'when `Sidekiq::Worker` is included' do
+ let(:source) { 'include Sidekiq::Worker' }
+ let(:correct_source) { 'include ApplicationWorker' }
+
+ it 'registers an offense ' do
+ inspect_source(cop, source)
+
+ aggregate_failures do
+ expect(cop.offenses.size).to eq(1)
+ expect(cop.offenses.map(&:line)).to eq([1])
+ expect(cop.highlights).to eq(['Sidekiq::Worker'])
+ end
+ end
+
+ it 'autocorrects to the right version' do
+ autocorrected = autocorrect_source(cop, source)
+
+ expect(autocorrected).to eq(correct_source)
+ end
+ end
+end
diff --git a/spec/rubocop/cop/sidekiq_options_queue_spec.rb b/spec/rubocop/cop/sidekiq_options_queue_spec.rb
new file mode 100644
index 00000000000..a31de381631
--- /dev/null
+++ b/spec/rubocop/cop/sidekiq_options_queue_spec.rb
@@ -0,0 +1,26 @@
+require 'spec_helper'
+require 'rubocop'
+require 'rubocop/rspec/support'
+require_relative '../../../rubocop/cop/sidekiq_options_queue'
+
+describe RuboCop::Cop::SidekiqOptionsQueue do
+ include CopHelper
+
+ subject(:cop) { described_class.new }
+
+ it 'registers an offense when `sidekiq_options` is used with the `queue` option' do
+ inspect_source(cop, 'sidekiq_options queue: "some_queue"')
+
+ aggregate_failures do
+ expect(cop.offenses.size).to eq(1)
+ expect(cop.offenses.map(&:line)).to eq([1])
+ expect(cop.highlights).to eq(['queue: "some_queue"'])
+ end
+ end
+
+ it 'does not register an offense when `sidekiq_options` is used with another option' do
+ inspect_source(cop, 'sidekiq_options retry: false')
+
+ expect(cop.offenses).to be_empty
+ end
+end
diff --git a/spec/workers/concerns/application_worker_spec.rb b/spec/workers/concerns/application_worker_spec.rb
index 0145563e0ed..901d77178bc 100644
--- a/spec/workers/concerns/application_worker_spec.rb
+++ b/spec/workers/concerns/application_worker_spec.rb
@@ -17,6 +17,14 @@ describe ApplicationWorker do
end
end
+ describe '.queue_namespace' do
+ it 'sets the queue name based on the class name' do
+ worker.queue_namespace :some_namespace
+
+ expect(worker.queue).to eq('some_namespace:foo_bar_dummy')
+ end
+ end
+
describe '.queue' do
it 'returns the queue name' do
worker.sidekiq_options queue: :some_queue
diff --git a/spec/workers/concerns/cluster_queue_spec.rb b/spec/workers/concerns/cluster_queue_spec.rb
index 5049886b55c..4118b9aa194 100644
--- a/spec/workers/concerns/cluster_queue_spec.rb
+++ b/spec/workers/concerns/cluster_queue_spec.rb
@@ -14,6 +14,6 @@ describe ClusterQueue do
it 'sets a default pipelines queue automatically' do
expect(worker.sidekiq_options['queue'])
- .to eq :gcp_cluster
+ .to eq 'gcp_cluster:dummy'
end
end
diff --git a/spec/workers/concerns/cronjob_queue_spec.rb b/spec/workers/concerns/cronjob_queue_spec.rb
index 3ae1c5f54d8..c042a52f41f 100644
--- a/spec/workers/concerns/cronjob_queue_spec.rb
+++ b/spec/workers/concerns/cronjob_queue_spec.rb
@@ -13,7 +13,7 @@ describe CronjobQueue do
end
it 'sets the queue name of a worker' do
- expect(worker.sidekiq_options['queue'].to_s).to eq('cronjob')
+ expect(worker.sidekiq_options['queue'].to_s).to eq('cronjob:dummy')
end
it 'disables retrying of failed jobs' do
diff --git a/spec/workers/concerns/gitlab/github_import/queue_spec.rb b/spec/workers/concerns/gitlab/github_import/queue_spec.rb
index 9c69ee32da1..a96f583aff7 100644
--- a/spec/workers/concerns/gitlab/github_import/queue_spec.rb
+++ b/spec/workers/concerns/gitlab/github_import/queue_spec.rb
@@ -11,6 +11,6 @@ describe Gitlab::GithubImport::Queue do
include Gitlab::GithubImport::Queue
end
- expect(worker.sidekiq_options['queue']).to eq('github_importer')
+ expect(worker.sidekiq_options['queue']).to eq('github_importer:dummy')
end
end
diff --git a/spec/workers/concerns/pipeline_queue_spec.rb b/spec/workers/concerns/pipeline_queue_spec.rb
index dd911760948..a312b307fce 100644
--- a/spec/workers/concerns/pipeline_queue_spec.rb
+++ b/spec/workers/concerns/pipeline_queue_spec.rb
@@ -14,15 +14,6 @@ describe PipelineQueue do
it 'sets a default pipelines queue automatically' do
expect(worker.sidekiq_options['queue'])
- .to eq 'pipeline_default'
- end
-
- describe '.enqueue_in' do
- it 'sets a custom sidekiq queue with prefix and group' do
- worker.enqueue_in(group: :processing)
-
- expect(worker.sidekiq_options['queue'])
- .to eq 'pipeline_processing'
- end
+ .to eq 'pipeline_default:dummy'
end
end
diff --git a/spec/workers/concerns/repository_check_queue_spec.rb b/spec/workers/concerns/repository_check_queue_spec.rb
index fdbbfcc90a5..d2eeecfc9a8 100644
--- a/spec/workers/concerns/repository_check_queue_spec.rb
+++ b/spec/workers/concerns/repository_check_queue_spec.rb
@@ -13,7 +13,7 @@ describe RepositoryCheckQueue do
end
it 'sets the queue name of a worker' do
- expect(worker.sidekiq_options['queue'].to_s).to eq('repository_check')
+ expect(worker.sidekiq_options['queue'].to_s).to eq('repository_check:dummy')
end
it 'disables retrying of failed jobs' do
diff --git a/spec/workers/every_sidekiq_worker_spec.rb b/spec/workers/every_sidekiq_worker_spec.rb
index 7ee0a51a263..9e3b99b3502 100644
--- a/spec/workers/every_sidekiq_worker_spec.rb
+++ b/spec/workers/every_sidekiq_worker_spec.rb
@@ -1,21 +1,36 @@
require 'spec_helper'
describe 'Every Sidekiq worker' do
- it 'includes ApplicationWorker' do
- expect(Gitlab::SidekiqConfig.workers).to all(include(ApplicationWorker))
- end
-
it 'does not use the default queue' do
expect(Gitlab::SidekiqConfig.workers.map(&:queue)).not_to include('default')
end
it 'uses the cronjob queue when the worker runs as a cronjob' do
- expect(Gitlab::SidekiqConfig.cron_workers.map(&:queue)).to all(eq('cronjob'))
+ expect(Gitlab::SidekiqConfig.cron_workers.map(&:queue)).to all(start_with('cronjob:'))
+ end
+
+ it 'has its queue in app/workers/all_queues.yml', :aggregate_failures do
+ file_worker_queues = Gitlab::SidekiqConfig.worker_queues.to_set
+
+ worker_queues = Gitlab::SidekiqConfig.workers.map(&:queue).to_set
+ worker_queues << ActionMailer::DeliveryJob.queue_name
+ worker_queues << 'default'
+
+ missing_from_file = worker_queues - file_worker_queues
+ expect(missing_from_file).to be_empty, "expected #{missing_from_file.to_a.inspect} to be in app/workers/all_queues.yml"
+
+ unncessarily_in_file = file_worker_queues - worker_queues
+ expect(unncessarily_in_file).to be_empty, "expected #{unncessarily_in_file.to_a.inspect} not to be in app/workers/all_queues.yml"
end
- it 'defines the queue in the Sidekiq configuration file' do
- config_queue_names = Gitlab::SidekiqConfig.config_queues.to_set
+ it 'has its queue or namespace in config/sidekiq_queues.yml', :aggregate_failures do
+ config_queues = Gitlab::SidekiqConfig.config_queues.to_set
+
+ Gitlab::SidekiqConfig.workers.each do |worker|
+ queue = worker.queue
+ queue_namespace = queue.split(':').first
- expect(Gitlab::SidekiqConfig.worker_queues).to all(be_in(config_queue_names))
+ expect(config_queues).to include(queue).or(include(queue_namespace))
+ end
end
end