summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDouwe Maan <douwe@selenight.nl>2017-11-28 17:16:50 +0100
committerDouwe Maan <douwe@selenight.nl>2017-12-12 17:36:20 +0100
commitb1849ee2e66b6355776397717a33dc7ada772332 (patch)
tree42cc03621690637e947c0677686ecf24a9771245
parentd673628de003d1ce1402f03311066339828fb811 (diff)
downloadgitlab-ce-b1849ee2e66b6355776397717a33dc7ada772332.tar.gz
Use a dedicated queue for each worker
-rw-r--r--app/workers/all_queues.yml98
-rw-r--r--app/workers/build_finished_worker.rb2
-rw-r--r--app/workers/build_hooks_worker.rb2
-rw-r--r--app/workers/build_queue_worker.rb2
-rw-r--r--app/workers/build_success_worker.rb2
-rw-r--r--app/workers/concerns/application_worker.rb22
-rw-r--r--app/workers/concerns/cluster_queue.rb2
-rw-r--r--app/workers/concerns/cronjob_queue.rb3
-rw-r--r--app/workers/concerns/gitlab/github_import/queue.rb4
-rw-r--r--app/workers/concerns/pipeline_queue.rb10
-rw-r--r--app/workers/concerns/repository_check_queue.rb4
-rw-r--r--app/workers/create_pipeline_worker.rb2
-rw-r--r--app/workers/expire_job_cache_worker.rb2
-rw-r--r--app/workers/expire_pipeline_cache_worker.rb2
-rw-r--r--app/workers/gitlab/github_import/advance_stage_worker.rb2
-rw-r--r--app/workers/pages_worker.rb2
-rw-r--r--app/workers/pipeline_hooks_worker.rb2
-rw-r--r--app/workers/pipeline_process_worker.rb2
-rw-r--r--app/workers/pipeline_success_worker.rb2
-rw-r--r--app/workers/pipeline_update_worker.rb2
-rw-r--r--app/workers/stage_update_worker.rb2
-rw-r--r--app/workers/update_head_pipeline_for_merge_request_worker.rb3
-rw-r--r--config/initializers/sidekiq.rb18
-rw-r--r--config/sidekiq_queues.yml8
-rw-r--r--lib/gitlab/sidekiq_config.rb37
-rw-r--r--lib/gitlab/sidekiq_versioning.rb25
-rw-r--r--lib/gitlab/sidekiq_versioning/manager.rb12
-rw-r--r--spec/lib/gitlab/sidekiq_config_spec.rb23
-rw-r--r--spec/lib/gitlab/sidekiq_versioning/manager_spec.rb22
-rw-r--r--spec/lib/gitlab/sidekiq_versioning_spec.rb44
-rw-r--r--spec/workers/concerns/application_worker_spec.rb8
-rw-r--r--spec/workers/concerns/cluster_queue_spec.rb2
-rw-r--r--spec/workers/concerns/cronjob_queue_spec.rb2
-rw-r--r--spec/workers/concerns/gitlab/github_import/queue_spec.rb2
-rw-r--r--spec/workers/concerns/pipeline_queue_spec.rb11
-rw-r--r--spec/workers/concerns/repository_check_queue_spec.rb2
-rw-r--r--spec/workers/every_sidekiq_worker_spec.rb27
37 files changed, 336 insertions, 81 deletions
diff --git a/app/workers/all_queues.yml b/app/workers/all_queues.yml
new file mode 100644
index 00000000000..ba31a5aa9c2
--- /dev/null
+++ b/app/workers/all_queues.yml
@@ -0,0 +1,98 @@
+---
+- cronjob:admin_email
+- cronjob:expire_build_artifacts
+- cronjob:gitlab_usage_ping
+- cronjob:import_export_project_cleanup
+- cronjob:pipeline_schedule
+- cronjob:prune_old_events
+- cronjob:remove_expired_group_links
+- cronjob:remove_expired_members
+- cronjob:remove_old_web_hook_logs
+- cronjob:remove_unreferenced_lfs_objects
+- cronjob:repository_archive_cache
+- cronjob:repository_check_batch
+- cronjob:requests_profiles
+- cronjob:schedule_update_user_activity
+- cronjob:stuck_ci_jobs
+- cronjob:stuck_import_jobs
+- cronjob:stuck_merge_jobs
+- cronjob:trending_projects
+
+- gcp_cluster:cluster_install_app
+- gcp_cluster:cluster_provision
+- gcp_cluster:cluster_wait_for_app_installation
+- gcp_cluster:wait_for_cluster_creation
+
+- github_import_advance_stage
+- github_importer:github_import_import_diff_note
+- github_importer:github_import_import_issue
+- github_importer:github_import_import_note
+- github_importer:github_import_import_pull_request
+- github_importer:github_import_refresh_import_jid
+- github_importer:github_import_stage_finish_import
+- github_importer:github_import_stage_import_base_data
+- github_importer:github_import_stage_import_issues_and_diff_notes
+- github_importer:github_import_stage_import_notes
+- github_importer:github_import_stage_import_pull_requests
+- github_importer:github_import_stage_import_repository
+
+- pipeline_cache:expire_job_cache
+- pipeline_cache:expire_pipeline_cache
+- pipeline_creation:create_pipeline
+- pipeline_default:build_coverage
+- pipeline_default:build_trace_sections
+- pipeline_default:pipeline_metrics
+- pipeline_default:pipeline_notification
+- pipeline_default:update_head_pipeline_for_merge_request
+- pipeline_hooks:build_hooks
+- pipeline_hooks:pipeline_hooks
+- pipeline_processing:build_finished
+- pipeline_processing:build_queue
+- pipeline_processing:build_success
+- pipeline_processing:pipeline_process
+- pipeline_processing:pipeline_success
+- pipeline_processing:pipeline_update
+- pipeline_processing:stage_update
+
+- repository_check:repository_check_clear
+- repository_check:repository_check_single_repository
+
+- default
+- mailers # ActionMailer::DeliveryJob.queue_name
+
+- authorized_projects
+- background_migration
+- create_gpg_signature
+- delete_merged_branches
+- delete_user
+- email_receiver
+- emails_on_push
+- expire_build_instance_artifacts
+- git_garbage_collect
+- gitlab_shell
+- group_destroy
+- invalid_gpg_signature_update
+- irker
+- merge
+- namespaceless_project_destroy
+- new_issue
+- new_merge_request
+- new_note
+- pages
+- post_receive
+- process_commit
+- project_cache
+- project_destroy
+- project_export
+- project_migrate_hashed_storage
+- project_service
+- propagate_service_template
+- reactive_caching
+- repository_fork
+- repository_import
+- storage_migrator
+- system_hook_push
+- update_merge_requests
+- update_user_activity
+- upload_checksum
+- web_hook
diff --git a/app/workers/build_finished_worker.rb b/app/workers/build_finished_worker.rb
index 5efa9180f5e..97d80305bec 100644
--- a/app/workers/build_finished_worker.rb
+++ b/app/workers/build_finished_worker.rb
@@ -2,7 +2,7 @@ class BuildFinishedWorker
include ApplicationWorker
include PipelineQueue
- enqueue_in group: :processing
+ queue_namespace :pipeline_processing
def perform(build_id)
Ci::Build.find_by(id: build_id).try do |build|
diff --git a/app/workers/build_hooks_worker.rb b/app/workers/build_hooks_worker.rb
index 6705a1c2709..cbfca8c342c 100644
--- a/app/workers/build_hooks_worker.rb
+++ b/app/workers/build_hooks_worker.rb
@@ -2,7 +2,7 @@ class BuildHooksWorker
include ApplicationWorker
include PipelineQueue
- enqueue_in group: :hooks
+ queue_namespace :pipeline_hooks
def perform(build_id)
Ci::Build.find_by(id: build_id)
diff --git a/app/workers/build_queue_worker.rb b/app/workers/build_queue_worker.rb
index fc775a84dc0..e4f4e6c1d9e 100644
--- a/app/workers/build_queue_worker.rb
+++ b/app/workers/build_queue_worker.rb
@@ -2,7 +2,7 @@ class BuildQueueWorker
include ApplicationWorker
include PipelineQueue
- enqueue_in group: :processing
+ queue_namespace :pipeline_processing
def perform(build_id)
Ci::Build.find_by(id: build_id).try do |build|
diff --git a/app/workers/build_success_worker.rb b/app/workers/build_success_worker.rb
index ec049821ad7..4b9097bc5e4 100644
--- a/app/workers/build_success_worker.rb
+++ b/app/workers/build_success_worker.rb
@@ -2,7 +2,7 @@ class BuildSuccessWorker
include ApplicationWorker
include PipelineQueue
- enqueue_in group: :processing
+ queue_namespace :pipeline_processing
def perform(build_id)
Ci::Build.find_by(id: build_id).try do |build|
diff --git a/app/workers/concerns/application_worker.rb b/app/workers/concerns/application_worker.rb
index 9c3bdabc49e..285b5bada7d 100644
--- a/app/workers/concerns/application_worker.rb
+++ b/app/workers/concerns/application_worker.rb
@@ -6,10 +6,20 @@ module ApplicationWorker
include Sidekiq::Worker
included do
- sidekiq_options queue: base_queue_name
+ set_queue
end
module ClassMethods
+ def inherited(subclass)
+ subclass.set_queue
+ end
+
+ def set_queue
+ queue_name = [queue_namespace, base_queue_name].compact.join(':')
+
+ sidekiq_options queue: queue_name
+ end
+
def base_queue_name
name
.sub(/\AGitlab::/, '')
@@ -18,6 +28,16 @@ module ApplicationWorker
.tr('/', '_')
end
+ def queue_namespace(new_namespace = nil)
+ if new_namespace
+ sidekiq_options queue_namespace: new_namespace
+
+ set_queue
+ else
+ get_sidekiq_options['queue_namespace']&.to_s
+ end
+ end
+
def queue
get_sidekiq_options['queue'].to_s
end
diff --git a/app/workers/concerns/cluster_queue.rb b/app/workers/concerns/cluster_queue.rb
index a5074d13220..24b9f145220 100644
--- a/app/workers/concerns/cluster_queue.rb
+++ b/app/workers/concerns/cluster_queue.rb
@@ -5,6 +5,6 @@ module ClusterQueue
extend ActiveSupport::Concern
included do
- sidekiq_options queue: :gcp_cluster
+ queue_namespace :gcp_cluster
end
end
diff --git a/app/workers/concerns/cronjob_queue.rb b/app/workers/concerns/cronjob_queue.rb
index e918bb011e0..b6581779f6a 100644
--- a/app/workers/concerns/cronjob_queue.rb
+++ b/app/workers/concerns/cronjob_queue.rb
@@ -4,6 +4,7 @@ module CronjobQueue
extend ActiveSupport::Concern
included do
- sidekiq_options queue: :cronjob, retry: false
+ queue_namespace :cronjob
+ sidekiq_options retry: false
end
end
diff --git a/app/workers/concerns/gitlab/github_import/queue.rb b/app/workers/concerns/gitlab/github_import/queue.rb
index a2bee361b86..22c2ce458e8 100644
--- a/app/workers/concerns/gitlab/github_import/queue.rb
+++ b/app/workers/concerns/gitlab/github_import/queue.rb
@@ -4,12 +4,14 @@ module Gitlab
extend ActiveSupport::Concern
included do
+ queue_namespace :github_importer
+
# If a job produces an error it may block a stage from advancing
# forever. To prevent this from happening we prevent jobs from going to
# the dead queue. This does mean some resources may not be imported, but
# this is better than a project being stuck in the "import" state
# forever.
- sidekiq_options queue: 'github_importer', dead: false, retry: 5
+ sidekiq_options dead: false, retry: 5
end
end
end
diff --git a/app/workers/concerns/pipeline_queue.rb b/app/workers/concerns/pipeline_queue.rb
index ddf45b91345..e77093a6902 100644
--- a/app/workers/concerns/pipeline_queue.rb
+++ b/app/workers/concerns/pipeline_queue.rb
@@ -5,14 +5,6 @@ module PipelineQueue
extend ActiveSupport::Concern
included do
- sidekiq_options queue: 'pipeline_default'
- end
-
- class_methods do
- def enqueue_in(group:)
- raise ArgumentError, 'Unspecified queue group!' if group.empty?
-
- sidekiq_options queue: "pipeline_#{group}"
- end
+ queue_namespace :pipeline_default
end
end
diff --git a/app/workers/concerns/repository_check_queue.rb b/app/workers/concerns/repository_check_queue.rb
index a597321ccf4..43fb66c31b0 100644
--- a/app/workers/concerns/repository_check_queue.rb
+++ b/app/workers/concerns/repository_check_queue.rb
@@ -3,6 +3,8 @@ module RepositoryCheckQueue
extend ActiveSupport::Concern
included do
- sidekiq_options queue: :repository_check, retry: false
+ queue_namespace :repository_check
+
+ sidekiq_options retry: false
end
end
diff --git a/app/workers/create_pipeline_worker.rb b/app/workers/create_pipeline_worker.rb
index 00cd7b85b9f..c3ac35e54f5 100644
--- a/app/workers/create_pipeline_worker.rb
+++ b/app/workers/create_pipeline_worker.rb
@@ -2,7 +2,7 @@ class CreatePipelineWorker
include ApplicationWorker
include PipelineQueue
- enqueue_in group: :creation
+ queue_namespace :pipeline_creation
def perform(project_id, user_id, ref, source, params = {})
project = Project.find(project_id)
diff --git a/app/workers/expire_job_cache_worker.rb b/app/workers/expire_job_cache_worker.rb
index a591e2da519..7217364a9f2 100644
--- a/app/workers/expire_job_cache_worker.rb
+++ b/app/workers/expire_job_cache_worker.rb
@@ -2,7 +2,7 @@ class ExpireJobCacheWorker
include ApplicationWorker
include PipelineQueue
- enqueue_in group: :cache
+ queue_namespace :pipeline_cache
def perform(job_id)
job = CommitStatus.joins(:pipeline, :project).find_by(id: job_id)
diff --git a/app/workers/expire_pipeline_cache_worker.rb b/app/workers/expire_pipeline_cache_worker.rb
index a3ac32b437d..3e34de22c19 100644
--- a/app/workers/expire_pipeline_cache_worker.rb
+++ b/app/workers/expire_pipeline_cache_worker.rb
@@ -2,7 +2,7 @@ class ExpirePipelineCacheWorker
include ApplicationWorker
include PipelineQueue
- enqueue_in group: :cache
+ queue_namespace :pipeline_cache
def perform(pipeline_id)
pipeline = Ci::Pipeline.find_by(id: pipeline_id)
diff --git a/app/workers/gitlab/github_import/advance_stage_worker.rb b/app/workers/gitlab/github_import/advance_stage_worker.rb
index 400396d5755..f7f498af840 100644
--- a/app/workers/gitlab/github_import/advance_stage_worker.rb
+++ b/app/workers/gitlab/github_import/advance_stage_worker.rb
@@ -9,7 +9,7 @@ module Gitlab
class AdvanceStageWorker
include ApplicationWorker
- sidekiq_options queue: 'github_importer_advance_stage', dead: false
+ sidekiq_options dead: false
INTERVAL = 30.seconds.to_i
diff --git a/app/workers/pages_worker.rb b/app/workers/pages_worker.rb
index 62f733c02fc..3ec81d040b4 100644
--- a/app/workers/pages_worker.rb
+++ b/app/workers/pages_worker.rb
@@ -1,7 +1,7 @@
class PagesWorker
include ApplicationWorker
- sidekiq_options queue: :pages, retry: false
+ sidekiq_options retry: false
def perform(action, *arg)
send(action, *arg) # rubocop:disable GitlabSecurity/PublicSend
diff --git a/app/workers/pipeline_hooks_worker.rb b/app/workers/pipeline_hooks_worker.rb
index 661c29efe88..c94918ff4ee 100644
--- a/app/workers/pipeline_hooks_worker.rb
+++ b/app/workers/pipeline_hooks_worker.rb
@@ -2,7 +2,7 @@ class PipelineHooksWorker
include ApplicationWorker
include PipelineQueue
- enqueue_in group: :hooks
+ queue_namespace :pipeline_hooks
def perform(pipeline_id)
Ci::Pipeline.find_by(id: pipeline_id)
diff --git a/app/workers/pipeline_process_worker.rb b/app/workers/pipeline_process_worker.rb
index 07dbf6a971e..24424b3f472 100644
--- a/app/workers/pipeline_process_worker.rb
+++ b/app/workers/pipeline_process_worker.rb
@@ -2,7 +2,7 @@ class PipelineProcessWorker
include ApplicationWorker
include PipelineQueue
- enqueue_in group: :processing
+ queue_namespace :pipeline_processing
def perform(pipeline_id)
Ci::Pipeline.find_by(id: pipeline_id)
diff --git a/app/workers/pipeline_success_worker.rb b/app/workers/pipeline_success_worker.rb
index 68c40a259e1..2ab0739a17f 100644
--- a/app/workers/pipeline_success_worker.rb
+++ b/app/workers/pipeline_success_worker.rb
@@ -2,7 +2,7 @@ class PipelineSuccessWorker
include ApplicationWorker
include PipelineQueue
- enqueue_in group: :processing
+ queue_namespace :pipeline_processing
def perform(pipeline_id)
Ci::Pipeline.find_by(id: pipeline_id).try do |pipeline|
diff --git a/app/workers/pipeline_update_worker.rb b/app/workers/pipeline_update_worker.rb
index 24a8a9fbed5..fc9da2d45b1 100644
--- a/app/workers/pipeline_update_worker.rb
+++ b/app/workers/pipeline_update_worker.rb
@@ -2,7 +2,7 @@ class PipelineUpdateWorker
include ApplicationWorker
include PipelineQueue
- enqueue_in group: :processing
+ queue_namespace :pipeline_processing
def perform(pipeline_id)
Ci::Pipeline.find_by(id: pipeline_id)
diff --git a/app/workers/stage_update_worker.rb b/app/workers/stage_update_worker.rb
index 69f2318d83b..e4b683fca33 100644
--- a/app/workers/stage_update_worker.rb
+++ b/app/workers/stage_update_worker.rb
@@ -2,7 +2,7 @@ class StageUpdateWorker
include ApplicationWorker
include PipelineQueue
- enqueue_in group: :processing
+ queue_namespace :pipeline_processing
def perform(stage_id)
Ci::Stage.find_by(id: stage_id).try do |stage|
diff --git a/app/workers/update_head_pipeline_for_merge_request_worker.rb b/app/workers/update_head_pipeline_for_merge_request_worker.rb
index 0a2e9b63578..f4d2ec28bea 100644
--- a/app/workers/update_head_pipeline_for_merge_request_worker.rb
+++ b/app/workers/update_head_pipeline_for_merge_request_worker.rb
@@ -1,7 +1,6 @@
class UpdateHeadPipelineForMergeRequestWorker
include ApplicationWorker
-
- sidekiq_options queue: 'pipeline_default'
+ include PipelineQueue
def perform(merge_request_id)
merge_request = MergeRequest.find(merge_request_id)
diff --git a/config/initializers/sidekiq.rb b/config/initializers/sidekiq.rb
index ba4481ae602..0f164e628f9 100644
--- a/config/initializers/sidekiq.rb
+++ b/config/initializers/sidekiq.rb
@@ -42,6 +42,8 @@ Sidekiq.configure_server do |config|
Gitlab::SidekiqThrottler.execute!
+ Gitlab::SidekiqVersioning.install!
+
config = Gitlab::Database.config ||
Rails.application.config.database_configuration[Rails.env]
config['pool'] = Sidekiq.options[:concurrency]
@@ -60,19 +62,3 @@ Sidekiq.configure_client do |config|
chain.add Gitlab::SidekiqStatus::ClientMiddleware
end
end
-
-# The Sidekiq client API always adds the queue to the Sidekiq queue
-# list, but mail_room and gitlab-shell do not. This is only necessary
-# for monitoring.
-begin
- queues = Gitlab::SidekiqConfig.worker_queues
-
- Sidekiq.redis do |conn|
- conn.pipelined do
- queues.each do |queue|
- conn.sadd('queues', queue)
- end
- end
- end
-rescue Redis::BaseError, SocketError, Errno::ENOENT, Errno::EADDRNOTAVAIL, Errno::EAFNOSUPPORT, Errno::ECONNRESET, Errno::ECONNREFUSED
-end
diff --git a/config/sidekiq_queues.yml b/config/sidekiq_queues.yml
index bc7c431731a..5c8233d796f 100644
--- a/config/sidekiq_queues.yml
+++ b/config/sidekiq_queues.yml
@@ -25,8 +25,8 @@
- [new_note, 2]
- [new_issue, 2]
- [new_merge_request, 2]
- - [build, 2]
- - [pipeline, 2]
+ - [build, 2] # Replaced by pipeline
+ - [pipeline, 2] # Replaced by pipeline_*
- [pipeline_processing, 5]
- [pipeline_creation, 4]
- [pipeline_default, 3]
@@ -38,11 +38,13 @@
- [mailers, 2]
- [invalid_gpg_signature_update, 2]
- [create_gpg_signature, 2]
+ - [rebase, 2]
- [upload_checksum, 1]
- [repository_fork, 1]
- [repository_import, 1]
- [github_importer, 1]
- - [github_importer_advance_stage, 1]
+ - [github_importer_advance_stage, 1] # Replaced by github_import_advance_stage
+ - [github_import_advance_stage, 1]
- [project_service, 1]
- [delete_user, 1]
- [delete_merged_branches, 1]
diff --git a/lib/gitlab/sidekiq_config.rb b/lib/gitlab/sidekiq_config.rb
index dc9886732b5..c3d7814551c 100644
--- a/lib/gitlab/sidekiq_config.rb
+++ b/lib/gitlab/sidekiq_config.rb
@@ -1,16 +1,35 @@
require 'yaml'
+require 'set'
module Gitlab
module SidekiqConfig
- def self.redis_queues
- @redis_queues ||= Sidekiq::Queue.all.map(&:name)
+ # This method is called by `bin/sidekiq-cluster` in EE, which runs outside
+ # of bundler/Rails context, so we cannot use any gem or Rails methods.
+ def self.worker_queues(rails_path = Rails.root.to_s)
+ @worker_queues ||= {}
+ @worker_queues[rails_path] ||= YAML.load_file(File.join(rails_path, 'app/workers/all_queues.yml'))
end
# This method is called by `bin/sidekiq-cluster` in EE, which runs outside
# of bundler/Rails context, so we cannot use any gem or Rails methods.
- def self.config_queues(rails_path = Rails.root.to_s)
+ def self.expand_queues(queues, all_queues = self.worker_queues)
+ return [] if queues.empty?
+
+ queues_set = all_queues.to_set
+
+ queues.flat_map do |queue|
+ [queue, *queues_set.grep(/\A#{queue}:/)]
+ end
+ end
+
+ def self.redis_queues
+ # Not memoized, because this can change during the life of the application
+ Sidekiq::Queue.all.map(&:name)
+ end
+
+ def self.config_queues
@config_queues ||= begin
- config = YAML.load_file(File.join(rails_path, 'config', 'sidekiq_queues.yml'))
+ config = YAML.load_file(Rails.root.join('config/sidekiq_queues.yml'))
config[:queues].map(&:first)
end
end
@@ -23,14 +42,6 @@ module Gitlab
@workers ||= find_workers(Rails.root.join('app', 'workers'))
end
- def self.default_queues
- [ActionMailer::DeliveryJob.queue_name, 'default']
- end
-
- def self.worker_queues
- @worker_queues ||= (workers.map(&:queue) + default_queues).uniq
- end
-
def self.find_workers(root)
concerns = root.join('concerns').to_s
@@ -43,7 +54,7 @@ module Gitlab
ns.camelize.constantize
end
- # Skip concerns
+ # Skip things that aren't workers
workers.select { |w| w < Sidekiq::Worker }
end
end
diff --git a/lib/gitlab/sidekiq_versioning.rb b/lib/gitlab/sidekiq_versioning.rb
new file mode 100644
index 00000000000..9683214ec18
--- /dev/null
+++ b/lib/gitlab/sidekiq_versioning.rb
@@ -0,0 +1,25 @@
+module Gitlab
+ module SidekiqVersioning
+ def self.install!
+ Sidekiq::Manager.prepend SidekiqVersioning::Manager
+
+ # The Sidekiq client API always adds the queue to the Sidekiq queue
+ # list, but mail_room and gitlab-shell do not. This is only necessary
+ # for monitoring.
+ begin
+ queues = SidekiqConfig.worker_queues
+
+ if queues.any?
+ Sidekiq.redis do |conn|
+ conn.pipelined do
+ queues.each do |queue|
+ conn.sadd('queues', queue)
+ end
+ end
+ end
+ end
+ rescue ::Redis::BaseError, SocketError, Errno::ENOENT, Errno::EADDRNOTAVAIL, Errno::EAFNOSUPPORT, Errno::ECONNRESET, Errno::ECONNREFUSED
+ end
+ end
+ end
+end
diff --git a/lib/gitlab/sidekiq_versioning/manager.rb b/lib/gitlab/sidekiq_versioning/manager.rb
new file mode 100644
index 00000000000..308be0fdf76
--- /dev/null
+++ b/lib/gitlab/sidekiq_versioning/manager.rb
@@ -0,0 +1,12 @@
+module Gitlab
+ module SidekiqVersioning
+ module Manager
+ def initialize(options = {})
+ options[:strict] = false
+ options[:queues] = SidekiqConfig.expand_queues(options[:queues])
+ Sidekiq.logger.info "Listening on queues #{options[:queues].uniq.sort}"
+ super
+ end
+ end
+ end
+end
diff --git a/spec/lib/gitlab/sidekiq_config_spec.rb b/spec/lib/gitlab/sidekiq_config_spec.rb
index 09f95be2213..0c66d764851 100644
--- a/spec/lib/gitlab/sidekiq_config_spec.rb
+++ b/spec/lib/gitlab/sidekiq_config_spec.rb
@@ -16,9 +16,30 @@ describe Gitlab::SidekiqConfig do
expect(queues).to include('post_receive')
expect(queues).to include('merge')
- expect(queues).to include('cronjob')
+ expect(queues).to include('cronjob:stuck_import_jobs')
expect(queues).to include('mailers')
expect(queues).to include('default')
end
end
+
+ describe '.expand_queues' do
+ it 'expands queue namespaces to concrete queue names' do
+ queues = described_class.expand_queues(%w[cronjob])
+
+ expect(queues).to include('cronjob:stuck_import_jobs')
+ expect(queues).to include('cronjob:stuck_merge_jobs')
+ end
+
+ it 'lets concrete queue names pass through' do
+ queues = described_class.expand_queues(%w[post_receive])
+
+ expect(queues).to include('post_receive')
+ end
+
+ it 'lets unknown queues pass through' do
+ queues = described_class.expand_queues(%w[unknown])
+
+ expect(queues).to include('unknown')
+ end
+ end
end
diff --git a/spec/lib/gitlab/sidekiq_versioning/manager_spec.rb b/spec/lib/gitlab/sidekiq_versioning/manager_spec.rb
new file mode 100644
index 00000000000..7debf70a16f
--- /dev/null
+++ b/spec/lib/gitlab/sidekiq_versioning/manager_spec.rb
@@ -0,0 +1,22 @@
+require 'spec_helper'
+
+describe Gitlab::SidekiqVersioning::Manager do
+ before do
+ Sidekiq::Manager.prepend described_class
+ end
+
+ describe '#initialize' do
+ it 'listens on all expanded queues' do
+ manager = Sidekiq::Manager.new(queues: %w[post_receive repository_fork cronjob unknown])
+
+ queues = manager.options[:queues]
+
+ expect(queues).to include('post_receive')
+ expect(queues).to include('repository_fork')
+ expect(queues).to include('cronjob')
+ expect(queues).to include('cronjob:stuck_import_jobs')
+ expect(queues).to include('cronjob:stuck_merge_jobs')
+ expect(queues).to include('unknown')
+ end
+ end
+end
diff --git a/spec/lib/gitlab/sidekiq_versioning_spec.rb b/spec/lib/gitlab/sidekiq_versioning_spec.rb
new file mode 100644
index 00000000000..fa6d42e730d
--- /dev/null
+++ b/spec/lib/gitlab/sidekiq_versioning_spec.rb
@@ -0,0 +1,44 @@
+require 'spec_helper'
+
+describe Gitlab::SidekiqVersioning, :sidekiq, :redis do
+ let(:foo_worker) do
+ Class.new do
+ def self.name
+ 'FooWorker'
+ end
+
+ include ApplicationWorker
+ end
+ end
+
+ let(:bar_worker) do
+ Class.new do
+ def self.name
+ 'BarWorker'
+ end
+
+ include ApplicationWorker
+ end
+ end
+
+ before do
+ allow(Gitlab::SidekiqConfig).to receive(:workers).and_return([foo_worker, bar_worker])
+ allow(Gitlab::SidekiqConfig).to receive(:worker_queues).and_return([foo_worker.queue, bar_worker.queue])
+ end
+
+ describe '.install!' do
+ it 'prepends SidekiqVersioning::Manager into Sidekiq::Manager' do
+ described_class.install!
+
+ expect(Sidekiq::Manager).to include(Gitlab::SidekiqVersioning::Manager)
+ end
+
+ it 'registers all versionless and versioned queues with Redis' do
+ described_class.install!
+
+ queues = Sidekiq::Queue.all.map(&:name)
+ expect(queues).to include('foo')
+ expect(queues).to include('bar')
+ end
+ end
+end
diff --git a/spec/workers/concerns/application_worker_spec.rb b/spec/workers/concerns/application_worker_spec.rb
index 0145563e0ed..901d77178bc 100644
--- a/spec/workers/concerns/application_worker_spec.rb
+++ b/spec/workers/concerns/application_worker_spec.rb
@@ -17,6 +17,14 @@ describe ApplicationWorker do
end
end
+ describe '.queue_namespace' do
+ it 'sets the queue name based on the class name' do
+ worker.queue_namespace :some_namespace
+
+ expect(worker.queue).to eq('some_namespace:foo_bar_dummy')
+ end
+ end
+
describe '.queue' do
it 'returns the queue name' do
worker.sidekiq_options queue: :some_queue
diff --git a/spec/workers/concerns/cluster_queue_spec.rb b/spec/workers/concerns/cluster_queue_spec.rb
index 5049886b55c..4118b9aa194 100644
--- a/spec/workers/concerns/cluster_queue_spec.rb
+++ b/spec/workers/concerns/cluster_queue_spec.rb
@@ -14,6 +14,6 @@ describe ClusterQueue do
it 'sets a default pipelines queue automatically' do
expect(worker.sidekiq_options['queue'])
- .to eq :gcp_cluster
+ .to eq 'gcp_cluster:dummy'
end
end
diff --git a/spec/workers/concerns/cronjob_queue_spec.rb b/spec/workers/concerns/cronjob_queue_spec.rb
index 3ae1c5f54d8..c042a52f41f 100644
--- a/spec/workers/concerns/cronjob_queue_spec.rb
+++ b/spec/workers/concerns/cronjob_queue_spec.rb
@@ -13,7 +13,7 @@ describe CronjobQueue do
end
it 'sets the queue name of a worker' do
- expect(worker.sidekiq_options['queue'].to_s).to eq('cronjob')
+ expect(worker.sidekiq_options['queue'].to_s).to eq('cronjob:dummy')
end
it 'disables retrying of failed jobs' do
diff --git a/spec/workers/concerns/gitlab/github_import/queue_spec.rb b/spec/workers/concerns/gitlab/github_import/queue_spec.rb
index 9c69ee32da1..a96f583aff7 100644
--- a/spec/workers/concerns/gitlab/github_import/queue_spec.rb
+++ b/spec/workers/concerns/gitlab/github_import/queue_spec.rb
@@ -11,6 +11,6 @@ describe Gitlab::GithubImport::Queue do
include Gitlab::GithubImport::Queue
end
- expect(worker.sidekiq_options['queue']).to eq('github_importer')
+ expect(worker.sidekiq_options['queue']).to eq('github_importer:dummy')
end
end
diff --git a/spec/workers/concerns/pipeline_queue_spec.rb b/spec/workers/concerns/pipeline_queue_spec.rb
index dd911760948..a312b307fce 100644
--- a/spec/workers/concerns/pipeline_queue_spec.rb
+++ b/spec/workers/concerns/pipeline_queue_spec.rb
@@ -14,15 +14,6 @@ describe PipelineQueue do
it 'sets a default pipelines queue automatically' do
expect(worker.sidekiq_options['queue'])
- .to eq 'pipeline_default'
- end
-
- describe '.enqueue_in' do
- it 'sets a custom sidekiq queue with prefix and group' do
- worker.enqueue_in(group: :processing)
-
- expect(worker.sidekiq_options['queue'])
- .to eq 'pipeline_processing'
- end
+ .to eq 'pipeline_default:dummy'
end
end
diff --git a/spec/workers/concerns/repository_check_queue_spec.rb b/spec/workers/concerns/repository_check_queue_spec.rb
index fdbbfcc90a5..d2eeecfc9a8 100644
--- a/spec/workers/concerns/repository_check_queue_spec.rb
+++ b/spec/workers/concerns/repository_check_queue_spec.rb
@@ -13,7 +13,7 @@ describe RepositoryCheckQueue do
end
it 'sets the queue name of a worker' do
- expect(worker.sidekiq_options['queue'].to_s).to eq('repository_check')
+ expect(worker.sidekiq_options['queue'].to_s).to eq('repository_check:dummy')
end
it 'disables retrying of failed jobs' do
diff --git a/spec/workers/every_sidekiq_worker_spec.rb b/spec/workers/every_sidekiq_worker_spec.rb
index 7ee0a51a263..bf99c24e73c 100644
--- a/spec/workers/every_sidekiq_worker_spec.rb
+++ b/spec/workers/every_sidekiq_worker_spec.rb
@@ -10,12 +10,31 @@ describe 'Every Sidekiq worker' do
end
it 'uses the cronjob queue when the worker runs as a cronjob' do
- expect(Gitlab::SidekiqConfig.cron_workers.map(&:queue)).to all(eq('cronjob'))
+ expect(Gitlab::SidekiqConfig.cron_workers.map(&:queue)).to all(start_with('cronjob:'))
end
- it 'defines the queue in the Sidekiq configuration file' do
- config_queue_names = Gitlab::SidekiqConfig.config_queues.to_set
+ it 'has its queue in app/workers/all_queues.yml', :aggregate_failures do
+ file_worker_queues = Gitlab::SidekiqConfig.worker_queues.to_set
- expect(Gitlab::SidekiqConfig.worker_queues).to all(be_in(config_queue_names))
+ worker_queues = Gitlab::SidekiqConfig.workers.map(&:queue).to_set
+ worker_queues << ActionMailer::DeliveryJob.queue_name
+ worker_queues << 'default'
+
+ missing_from_file = worker_queues - file_worker_queues
+ expect(missing_from_file).to be_empty, "expected #{missing_from_file.to_a.inspect} to be in app/workers/all_queues.yml"
+
+ unncessarily_in_file = file_worker_queues - worker_queues
+ expect(unncessarily_in_file).to be_empty, "expected #{unncessarily_in_file.to_a.inspect} not to be in app/workers/all_queues.yml"
+ end
+
+ it 'has its queue or namespace in config/sidekiq_queues.yml', :aggregate_failures do
+ config_queues = Gitlab::SidekiqConfig.config_queues.to_set
+
+ Gitlab::SidekiqConfig.workers.each do |worker|
+ queue = worker.queue
+ queue_namespace = queue.split(':').first
+
+ expect(config_queues).to include(queue).or(include(queue_namespace))
+ end
end
end