diff options
45 files changed, 510 insertions, 102 deletions
diff --git a/app/workers/all_queues.yml b/app/workers/all_queues.yml new file mode 100644 index 00000000000..ba31a5aa9c2 --- /dev/null +++ b/app/workers/all_queues.yml @@ -0,0 +1,98 @@ +--- +- cronjob:admin_email +- cronjob:expire_build_artifacts +- cronjob:gitlab_usage_ping +- cronjob:import_export_project_cleanup +- cronjob:pipeline_schedule +- cronjob:prune_old_events +- cronjob:remove_expired_group_links +- cronjob:remove_expired_members +- cronjob:remove_old_web_hook_logs +- cronjob:remove_unreferenced_lfs_objects +- cronjob:repository_archive_cache +- cronjob:repository_check_batch +- cronjob:requests_profiles +- cronjob:schedule_update_user_activity +- cronjob:stuck_ci_jobs +- cronjob:stuck_import_jobs +- cronjob:stuck_merge_jobs +- cronjob:trending_projects + +- gcp_cluster:cluster_install_app +- gcp_cluster:cluster_provision +- gcp_cluster:cluster_wait_for_app_installation +- gcp_cluster:wait_for_cluster_creation + +- github_import_advance_stage +- github_importer:github_import_import_diff_note +- github_importer:github_import_import_issue +- github_importer:github_import_import_note +- github_importer:github_import_import_pull_request +- github_importer:github_import_refresh_import_jid +- github_importer:github_import_stage_finish_import +- github_importer:github_import_stage_import_base_data +- github_importer:github_import_stage_import_issues_and_diff_notes +- github_importer:github_import_stage_import_notes +- github_importer:github_import_stage_import_pull_requests +- github_importer:github_import_stage_import_repository + +- pipeline_cache:expire_job_cache +- pipeline_cache:expire_pipeline_cache +- pipeline_creation:create_pipeline +- pipeline_default:build_coverage +- pipeline_default:build_trace_sections +- pipeline_default:pipeline_metrics +- pipeline_default:pipeline_notification +- pipeline_default:update_head_pipeline_for_merge_request +- pipeline_hooks:build_hooks +- pipeline_hooks:pipeline_hooks +- pipeline_processing:build_finished +- pipeline_processing:build_queue +- pipeline_processing:build_success +- pipeline_processing:pipeline_process +- pipeline_processing:pipeline_success +- pipeline_processing:pipeline_update +- pipeline_processing:stage_update + +- repository_check:repository_check_clear +- repository_check:repository_check_single_repository + +- default +- mailers # ActionMailer::DeliveryJob.queue_name + +- authorized_projects +- background_migration +- create_gpg_signature +- delete_merged_branches +- delete_user +- email_receiver +- emails_on_push +- expire_build_instance_artifacts +- git_garbage_collect +- gitlab_shell +- group_destroy +- invalid_gpg_signature_update +- irker +- merge +- namespaceless_project_destroy +- new_issue +- new_merge_request +- new_note +- pages +- post_receive +- process_commit +- project_cache +- project_destroy +- project_export +- project_migrate_hashed_storage +- project_service +- propagate_service_template +- reactive_caching +- repository_fork +- repository_import +- storage_migrator +- system_hook_push +- update_merge_requests +- update_user_activity +- upload_checksum +- web_hook diff --git a/app/workers/build_finished_worker.rb b/app/workers/build_finished_worker.rb index 5efa9180f5e..97d80305bec 100644 --- a/app/workers/build_finished_worker.rb +++ b/app/workers/build_finished_worker.rb @@ -2,7 +2,7 @@ class BuildFinishedWorker include ApplicationWorker include PipelineQueue - enqueue_in group: :processing + queue_namespace :pipeline_processing def perform(build_id) Ci::Build.find_by(id: build_id).try do |build| diff --git a/app/workers/build_hooks_worker.rb b/app/workers/build_hooks_worker.rb index 6705a1c2709..cbfca8c342c 100644 --- a/app/workers/build_hooks_worker.rb +++ b/app/workers/build_hooks_worker.rb @@ -2,7 +2,7 @@ class BuildHooksWorker include ApplicationWorker include PipelineQueue - enqueue_in group: :hooks + queue_namespace :pipeline_hooks def perform(build_id) Ci::Build.find_by(id: build_id) diff --git a/app/workers/build_queue_worker.rb b/app/workers/build_queue_worker.rb index fc775a84dc0..e4f4e6c1d9e 100644 --- a/app/workers/build_queue_worker.rb +++ b/app/workers/build_queue_worker.rb @@ -2,7 +2,7 @@ class BuildQueueWorker include ApplicationWorker include PipelineQueue - enqueue_in group: :processing + queue_namespace :pipeline_processing def perform(build_id) Ci::Build.find_by(id: build_id).try do |build| diff --git a/app/workers/build_success_worker.rb b/app/workers/build_success_worker.rb index ec049821ad7..4b9097bc5e4 100644 --- a/app/workers/build_success_worker.rb +++ b/app/workers/build_success_worker.rb @@ -2,7 +2,7 @@ class BuildSuccessWorker include ApplicationWorker include PipelineQueue - enqueue_in group: :processing + queue_namespace :pipeline_processing def perform(build_id) Ci::Build.find_by(id: build_id).try do |build| diff --git a/app/workers/concerns/application_worker.rb b/app/workers/concerns/application_worker.rb index 9c3bdabc49e..37586e161c9 100644 --- a/app/workers/concerns/application_worker.rb +++ b/app/workers/concerns/application_worker.rb @@ -3,13 +3,23 @@ Sidekiq::Worker.extend ActiveSupport::Concern module ApplicationWorker extend ActiveSupport::Concern - include Sidekiq::Worker + include Sidekiq::Worker # rubocop:disable Cop/IncludeSidekiqWorker included do - sidekiq_options queue: base_queue_name + set_queue end module ClassMethods + def inherited(subclass) + subclass.set_queue + end + + def set_queue + queue_name = [queue_namespace, base_queue_name].compact.join(':') + + sidekiq_options queue: queue_name # rubocop:disable Cop/SidekiqOptionsQueue + end + def base_queue_name name .sub(/\AGitlab::/, '') @@ -18,6 +28,16 @@ module ApplicationWorker .tr('/', '_') end + def queue_namespace(new_namespace = nil) + if new_namespace + sidekiq_options queue_namespace: new_namespace + + set_queue + else + get_sidekiq_options['queue_namespace']&.to_s + end + end + def queue get_sidekiq_options['queue'].to_s end diff --git a/app/workers/concerns/cluster_queue.rb b/app/workers/concerns/cluster_queue.rb index a5074d13220..24b9f145220 100644 --- a/app/workers/concerns/cluster_queue.rb +++ b/app/workers/concerns/cluster_queue.rb @@ -5,6 +5,6 @@ module ClusterQueue extend ActiveSupport::Concern included do - sidekiq_options queue: :gcp_cluster + queue_namespace :gcp_cluster end end diff --git a/app/workers/concerns/cronjob_queue.rb b/app/workers/concerns/cronjob_queue.rb index e918bb011e0..b6581779f6a 100644 --- a/app/workers/concerns/cronjob_queue.rb +++ b/app/workers/concerns/cronjob_queue.rb @@ -4,6 +4,7 @@ module CronjobQueue extend ActiveSupport::Concern included do - sidekiq_options queue: :cronjob, retry: false + queue_namespace :cronjob + sidekiq_options retry: false end end diff --git a/app/workers/concerns/gitlab/github_import/queue.rb b/app/workers/concerns/gitlab/github_import/queue.rb index a2bee361b86..22c2ce458e8 100644 --- a/app/workers/concerns/gitlab/github_import/queue.rb +++ b/app/workers/concerns/gitlab/github_import/queue.rb @@ -4,12 +4,14 @@ module Gitlab extend ActiveSupport::Concern included do + queue_namespace :github_importer + # If a job produces an error it may block a stage from advancing # forever. To prevent this from happening we prevent jobs from going to # the dead queue. This does mean some resources may not be imported, but # this is better than a project being stuck in the "import" state # forever. - sidekiq_options queue: 'github_importer', dead: false, retry: 5 + sidekiq_options dead: false, retry: 5 end end end diff --git a/app/workers/concerns/pipeline_queue.rb b/app/workers/concerns/pipeline_queue.rb index ddf45b91345..e77093a6902 100644 --- a/app/workers/concerns/pipeline_queue.rb +++ b/app/workers/concerns/pipeline_queue.rb @@ -5,14 +5,6 @@ module PipelineQueue extend ActiveSupport::Concern included do - sidekiq_options queue: 'pipeline_default' - end - - class_methods do - def enqueue_in(group:) - raise ArgumentError, 'Unspecified queue group!' if group.empty? - - sidekiq_options queue: "pipeline_#{group}" - end + queue_namespace :pipeline_default end end diff --git a/app/workers/concerns/repository_check_queue.rb b/app/workers/concerns/repository_check_queue.rb index a597321ccf4..43fb66c31b0 100644 --- a/app/workers/concerns/repository_check_queue.rb +++ b/app/workers/concerns/repository_check_queue.rb @@ -3,6 +3,8 @@ module RepositoryCheckQueue extend ActiveSupport::Concern included do - sidekiq_options queue: :repository_check, retry: false + queue_namespace :repository_check + + sidekiq_options retry: false end end diff --git a/app/workers/create_pipeline_worker.rb b/app/workers/create_pipeline_worker.rb index 00cd7b85b9f..c3ac35e54f5 100644 --- a/app/workers/create_pipeline_worker.rb +++ b/app/workers/create_pipeline_worker.rb @@ -2,7 +2,7 @@ class CreatePipelineWorker include ApplicationWorker include PipelineQueue - enqueue_in group: :creation + queue_namespace :pipeline_creation def perform(project_id, user_id, ref, source, params = {}) project = Project.find(project_id) diff --git a/app/workers/expire_job_cache_worker.rb b/app/workers/expire_job_cache_worker.rb index a591e2da519..7217364a9f2 100644 --- a/app/workers/expire_job_cache_worker.rb +++ b/app/workers/expire_job_cache_worker.rb @@ -2,7 +2,7 @@ class ExpireJobCacheWorker include ApplicationWorker include PipelineQueue - enqueue_in group: :cache + queue_namespace :pipeline_cache def perform(job_id) job = CommitStatus.joins(:pipeline, :project).find_by(id: job_id) diff --git a/app/workers/expire_pipeline_cache_worker.rb b/app/workers/expire_pipeline_cache_worker.rb index a3ac32b437d..3e34de22c19 100644 --- a/app/workers/expire_pipeline_cache_worker.rb +++ b/app/workers/expire_pipeline_cache_worker.rb @@ -2,7 +2,7 @@ class ExpirePipelineCacheWorker include ApplicationWorker include PipelineQueue - enqueue_in group: :cache + queue_namespace :pipeline_cache def perform(pipeline_id) pipeline = Ci::Pipeline.find_by(id: pipeline_id) diff --git a/app/workers/gitlab/github_import/advance_stage_worker.rb b/app/workers/gitlab/github_import/advance_stage_worker.rb index 400396d5755..f7f498af840 100644 --- a/app/workers/gitlab/github_import/advance_stage_worker.rb +++ b/app/workers/gitlab/github_import/advance_stage_worker.rb @@ -9,7 +9,7 @@ module Gitlab class AdvanceStageWorker include ApplicationWorker - sidekiq_options queue: 'github_importer_advance_stage', dead: false + sidekiq_options dead: false INTERVAL = 30.seconds.to_i diff --git a/app/workers/pages_worker.rb b/app/workers/pages_worker.rb index 62f733c02fc..3ec81d040b4 100644 --- a/app/workers/pages_worker.rb +++ b/app/workers/pages_worker.rb @@ -1,7 +1,7 @@ class PagesWorker include ApplicationWorker - sidekiq_options queue: :pages, retry: false + sidekiq_options retry: false def perform(action, *arg) send(action, *arg) # rubocop:disable GitlabSecurity/PublicSend diff --git a/app/workers/pipeline_hooks_worker.rb b/app/workers/pipeline_hooks_worker.rb index 661c29efe88..c94918ff4ee 100644 --- a/app/workers/pipeline_hooks_worker.rb +++ b/app/workers/pipeline_hooks_worker.rb @@ -2,7 +2,7 @@ class PipelineHooksWorker include ApplicationWorker include PipelineQueue - enqueue_in group: :hooks + queue_namespace :pipeline_hooks def perform(pipeline_id) Ci::Pipeline.find_by(id: pipeline_id) diff --git a/app/workers/pipeline_process_worker.rb b/app/workers/pipeline_process_worker.rb index 07dbf6a971e..24424b3f472 100644 --- a/app/workers/pipeline_process_worker.rb +++ b/app/workers/pipeline_process_worker.rb @@ -2,7 +2,7 @@ class PipelineProcessWorker include ApplicationWorker include PipelineQueue - enqueue_in group: :processing + queue_namespace :pipeline_processing def perform(pipeline_id) Ci::Pipeline.find_by(id: pipeline_id) diff --git a/app/workers/pipeline_success_worker.rb b/app/workers/pipeline_success_worker.rb index 68c40a259e1..2ab0739a17f 100644 --- a/app/workers/pipeline_success_worker.rb +++ b/app/workers/pipeline_success_worker.rb @@ -2,7 +2,7 @@ class PipelineSuccessWorker include ApplicationWorker include PipelineQueue - enqueue_in group: :processing + queue_namespace :pipeline_processing def perform(pipeline_id) Ci::Pipeline.find_by(id: pipeline_id).try do |pipeline| diff --git a/app/workers/pipeline_update_worker.rb b/app/workers/pipeline_update_worker.rb index 24a8a9fbed5..fc9da2d45b1 100644 --- a/app/workers/pipeline_update_worker.rb +++ b/app/workers/pipeline_update_worker.rb @@ -2,7 +2,7 @@ class PipelineUpdateWorker include ApplicationWorker include PipelineQueue - enqueue_in group: :processing + queue_namespace :pipeline_processing def perform(pipeline_id) Ci::Pipeline.find_by(id: pipeline_id) diff --git a/app/workers/stage_update_worker.rb b/app/workers/stage_update_worker.rb index 69f2318d83b..e4b683fca33 100644 --- a/app/workers/stage_update_worker.rb +++ b/app/workers/stage_update_worker.rb @@ -2,7 +2,7 @@ class StageUpdateWorker include ApplicationWorker include PipelineQueue - enqueue_in group: :processing + queue_namespace :pipeline_processing def perform(stage_id) Ci::Stage.find_by(id: stage_id).try do |stage| diff --git a/app/workers/update_head_pipeline_for_merge_request_worker.rb b/app/workers/update_head_pipeline_for_merge_request_worker.rb index 68c71a2b7a7..f09d89aa170 100644 --- a/app/workers/update_head_pipeline_for_merge_request_worker.rb +++ b/app/workers/update_head_pipeline_for_merge_request_worker.rb @@ -1,7 +1,6 @@ class UpdateHeadPipelineForMergeRequestWorker include ApplicationWorker - - sidekiq_options queue: 'pipeline_default' + include PipelineQueue def perform(merge_request_id) merge_request = MergeRequest.find(merge_request_id) diff --git a/config/initializers/sidekiq.rb b/config/initializers/sidekiq.rb index ba4481ae602..0f164e628f9 100644 --- a/config/initializers/sidekiq.rb +++ b/config/initializers/sidekiq.rb @@ -42,6 +42,8 @@ Sidekiq.configure_server do |config| Gitlab::SidekiqThrottler.execute! + Gitlab::SidekiqVersioning.install! + config = Gitlab::Database.config || Rails.application.config.database_configuration[Rails.env] config['pool'] = Sidekiq.options[:concurrency] @@ -60,19 +62,3 @@ Sidekiq.configure_client do |config| chain.add Gitlab::SidekiqStatus::ClientMiddleware end end - -# The Sidekiq client API always adds the queue to the Sidekiq queue -# list, but mail_room and gitlab-shell do not. This is only necessary -# for monitoring. -begin - queues = Gitlab::SidekiqConfig.worker_queues - - Sidekiq.redis do |conn| - conn.pipelined do - queues.each do |queue| - conn.sadd('queues', queue) - end - end - end -rescue Redis::BaseError, SocketError, Errno::ENOENT, Errno::EADDRNOTAVAIL, Errno::EAFNOSUPPORT, Errno::ECONNRESET, Errno::ECONNREFUSED -end diff --git a/config/sidekiq_queues.yml b/config/sidekiq_queues.yml index bc7c431731a..31a38f2b508 100644 --- a/config/sidekiq_queues.yml +++ b/config/sidekiq_queues.yml @@ -25,8 +25,6 @@ - [new_note, 2] - [new_issue, 2] - [new_merge_request, 2] - - [build, 2] - - [pipeline, 2] - [pipeline_processing, 5] - [pipeline_creation, 4] - [pipeline_default, 3] @@ -38,11 +36,12 @@ - [mailers, 2] - [invalid_gpg_signature_update, 2] - [create_gpg_signature, 2] + - [rebase, 2] - [upload_checksum, 1] - [repository_fork, 1] - [repository_import, 1] - [github_importer, 1] - - [github_importer_advance_stage, 1] + - [github_import_advance_stage, 1] - [project_service, 1] - [delete_user, 1] - [delete_merged_branches, 1] diff --git a/db/post_migrate/20171213160445_migrate_github_importer_advance_stage_sidekiq_queue.rb b/db/post_migrate/20171213160445_migrate_github_importer_advance_stage_sidekiq_queue.rb new file mode 100644 index 00000000000..149c28f1946 --- /dev/null +++ b/db/post_migrate/20171213160445_migrate_github_importer_advance_stage_sidekiq_queue.rb @@ -0,0 +1,16 @@ +# See http://doc.gitlab.com/ce/development/migration_style_guide.html +# for more information on how to write migrations for GitLab. + +class MigrateGithubImporterAdvanceStageSidekiqQueue < ActiveRecord::Migration + include Gitlab::Database::MigrationHelpers + + DOWNTIME = false + + def up + sidekiq_queue_migrate 'github_importer_advance_stage', to: 'github_import_advance_stage' + end + + def down + sidekiq_queue_migrate 'github_import_advance_stage', to: 'github_importer_advance_stage' + end +end diff --git a/db/schema.rb b/db/schema.rb index f0b1da16d53..2048c50f892 100644 --- a/db/schema.rb +++ b/db/schema.rb @@ -11,7 +11,7 @@ # # It's strongly recommended that you check this file into your version control system. -ActiveRecord::Schema.define(version: 20171206221519) do +ActiveRecord::Schema.define(version: 20171213160445) do # These are extensions that must be enabled in order to support this database enable_extension "plpgsql" diff --git a/doc/development/sidekiq_style_guide.md b/doc/development/sidekiq_style_guide.md index 085fb8f902c..59ebf41e09f 100644 --- a/doc/development/sidekiq_style_guide.md +++ b/doc/development/sidekiq_style_guide.md @@ -9,25 +9,54 @@ All workers should include `ApplicationWorker` instead of `Sidekiq::Worker`, which adds some convenience methods and automatically sets the queue based on the worker's name. -## Default Queue +## Dedicated Queues -Use of the "default" queue is not allowed. Every worker should use a queue that -matches the worker's purpose the closest. For example, workers that are to be -executed periodically should use the "cronjob" queue. +All workers should use their own queue, which is automatically set based on the +worker class name. For a worker named `ProcessSomethingWorker`, the queue name +would be `process_something`. If you're not sure what queue a worker uses, +you can find it using `SomeWorker.queue`. There is almost never a reason to +manually override the queue name using `sidekiq_options queue: :some_queue`. -A list of all available queues can be found in `config/sidekiq_queues.yml`. +## Queue Namespaces -## Dedicated Queues +While different workers cannot share a queue, they can share a queue namespace. -Most workers should use their own queue, which is automatically set based on the -worker class name. For a worker named `ProcessSomethingWorker`, the queue name -would be `process_something`. If you're not sure what a worker's queue name is, -you can find it using `SomeWorker.queue`. +Defining a queue namespace for a worker makes it possible to start a Sidekiq +process that automatically handles jobs for all workers in that namespace, +without needing to explicitly list all their queue names. If, for example, all +workers that are managed by sidekiq-cron use the `cronjob` queue namespace, we +can spin up a Sidekiq process specifically for these kinds of scheduled jobs. +If a new worker using the `cronjob` namespace is added later on, the Sidekiq +process will automatically pick up jobs for that worker too (after having been +restarted), without the need to change any configuration. + +A queue namespace can be set using the `queue_namespace` DSL class method: + +```ruby +class SomeScheduledTaskWorker + include ApplicationWorker + + queue_namespace :cronjob + + # ... +end +``` + +Behind the scenes, this will set `SomeScheduledTaskWorker.queue` to +`cronjob:some_scheduled_task`. Commonly used namespaces will have their own +concern module that can easily be included into the worker class, and that may +set other Sidekiq options besides the queue namespace. `CronjobQueue`, for +example, sets the namespace, but also disables retries. + +`bundle exec sidekiq` is namespace-aware, and will automatically listen on all +queues in a namespace (technically: all queues prefixed with the namespace name) +when a namespace is provided instead of a simple queue name in the `--queue` +(`-q`) option, or in the `:queues:` section in `config/sidekiq_queues.yml`. -In some cases multiple workers do use the same queue. For example, the various -workers for updating CI pipelines all use the `pipeline` queue. Adding workers -to existing queues should be done with care, as adding more workers can lead to -slow jobs blocking work (even for different jobs) on the shared queue. +Note that adding a worker to an existing namespace should be done with care, as +the extra jobs will take resources away from jobs from workers that were already +there, if the resources available to the Sidekiq process handling the namespace +are not adjusted appropriately. ## Tests @@ -36,7 +65,7 @@ tests should be placed in `spec/workers`. ## Removing or renaming queues -Try to avoid renaming or removing queues in minor and patch releases. +Try to avoid renaming or removing workers and their queues in minor and patch releases. During online update instance can have pending jobs and removing the queue can lead to those jobs being stuck forever. If you can't write migration for those Sidekiq jobs, please consider doing rename or remove queue in major release only. diff --git a/lib/gitlab/sidekiq_config.rb b/lib/gitlab/sidekiq_config.rb index dc9886732b5..c3d7814551c 100644 --- a/lib/gitlab/sidekiq_config.rb +++ b/lib/gitlab/sidekiq_config.rb @@ -1,16 +1,35 @@ require 'yaml' +require 'set' module Gitlab module SidekiqConfig - def self.redis_queues - @redis_queues ||= Sidekiq::Queue.all.map(&:name) + # This method is called by `bin/sidekiq-cluster` in EE, which runs outside + # of bundler/Rails context, so we cannot use any gem or Rails methods. + def self.worker_queues(rails_path = Rails.root.to_s) + @worker_queues ||= {} + @worker_queues[rails_path] ||= YAML.load_file(File.join(rails_path, 'app/workers/all_queues.yml')) end # This method is called by `bin/sidekiq-cluster` in EE, which runs outside # of bundler/Rails context, so we cannot use any gem or Rails methods. - def self.config_queues(rails_path = Rails.root.to_s) + def self.expand_queues(queues, all_queues = self.worker_queues) + return [] if queues.empty? + + queues_set = all_queues.to_set + + queues.flat_map do |queue| + [queue, *queues_set.grep(/\A#{queue}:/)] + end + end + + def self.redis_queues + # Not memoized, because this can change during the life of the application + Sidekiq::Queue.all.map(&:name) + end + + def self.config_queues @config_queues ||= begin - config = YAML.load_file(File.join(rails_path, 'config', 'sidekiq_queues.yml')) + config = YAML.load_file(Rails.root.join('config/sidekiq_queues.yml')) config[:queues].map(&:first) end end @@ -23,14 +42,6 @@ module Gitlab @workers ||= find_workers(Rails.root.join('app', 'workers')) end - def self.default_queues - [ActionMailer::DeliveryJob.queue_name, 'default'] - end - - def self.worker_queues - @worker_queues ||= (workers.map(&:queue) + default_queues).uniq - end - def self.find_workers(root) concerns = root.join('concerns').to_s @@ -43,7 +54,7 @@ module Gitlab ns.camelize.constantize end - # Skip concerns + # Skip things that aren't workers workers.select { |w| w < Sidekiq::Worker } end end diff --git a/lib/gitlab/sidekiq_versioning.rb b/lib/gitlab/sidekiq_versioning.rb new file mode 100644 index 00000000000..9683214ec18 --- /dev/null +++ b/lib/gitlab/sidekiq_versioning.rb @@ -0,0 +1,25 @@ +module Gitlab + module SidekiqVersioning + def self.install! + Sidekiq::Manager.prepend SidekiqVersioning::Manager + + # The Sidekiq client API always adds the queue to the Sidekiq queue + # list, but mail_room and gitlab-shell do not. This is only necessary + # for monitoring. + begin + queues = SidekiqConfig.worker_queues + + if queues.any? + Sidekiq.redis do |conn| + conn.pipelined do + queues.each do |queue| + conn.sadd('queues', queue) + end + end + end + end + rescue ::Redis::BaseError, SocketError, Errno::ENOENT, Errno::EADDRNOTAVAIL, Errno::EAFNOSUPPORT, Errno::ECONNRESET, Errno::ECONNREFUSED + end + end + end +end diff --git a/lib/gitlab/sidekiq_versioning/manager.rb b/lib/gitlab/sidekiq_versioning/manager.rb new file mode 100644 index 00000000000..308be0fdf76 --- /dev/null +++ b/lib/gitlab/sidekiq_versioning/manager.rb @@ -0,0 +1,12 @@ +module Gitlab + module SidekiqVersioning + module Manager + def initialize(options = {}) + options[:strict] = false + options[:queues] = SidekiqConfig.expand_queues(options[:queues]) + Sidekiq.logger.info "Listening on queues #{options[:queues].uniq.sort}" + super + end + end + end +end diff --git a/rubocop/cop/include_sidekiq_worker.rb b/rubocop/cop/include_sidekiq_worker.rb new file mode 100644 index 00000000000..4a6332286a2 --- /dev/null +++ b/rubocop/cop/include_sidekiq_worker.rb @@ -0,0 +1,29 @@ +require_relative '../spec_helpers' + +module RuboCop + module Cop + # Cop that makes sure workers include `ApplicationWorker`, not `Sidekiq::Worker`. + class IncludeSidekiqWorker < RuboCop::Cop::Cop + include SpecHelpers + + MSG = 'Include `ApplicationWorker`, not `Sidekiq::Worker`.'.freeze + + def_node_matcher :includes_sidekiq_worker?, <<~PATTERN + (send nil :include (const (const nil :Sidekiq) :Worker)) + PATTERN + + def on_send(node) + return if in_spec?(node) + return unless includes_sidekiq_worker?(node) + + add_offense(node.arguments.first, :expression) + end + + def autocorrect(node) + lambda do |corrector| + corrector.replace(node.source_range, 'ApplicationWorker') + end + end + end + end +end diff --git a/rubocop/cop/sidekiq_options_queue.rb b/rubocop/cop/sidekiq_options_queue.rb new file mode 100644 index 00000000000..43b35ba0214 --- /dev/null +++ b/rubocop/cop/sidekiq_options_queue.rb @@ -0,0 +1,27 @@ +require_relative '../spec_helpers' + +module RuboCop + module Cop + # Cop that prevents manually setting a queue in Sidekiq workers. + class SidekiqOptionsQueue < RuboCop::Cop::Cop + include SpecHelpers + + MSG = 'Do not manually set a queue; `ApplicationWorker` sets one automatically.'.freeze + + def_node_matcher :sidekiq_options?, <<~PATTERN + (send nil :sidekiq_options $...) + PATTERN + + def on_send(node) + return if in_spec?(node) + return unless sidekiq_options?(node) + + node.arguments.first.each_node(:pair) do |pair| + key_name = pair.key.children[0] + + add_offense(pair, :expression) if key_name == :queue + end + end + end + end +end diff --git a/rubocop/rubocop.rb b/rubocop/rubocop.rb index eb52be3d731..3e3b4c8349a 100644 --- a/rubocop/rubocop.rb +++ b/rubocop/rubocop.rb @@ -3,10 +3,12 @@ require_relative 'cop/active_record_serialize' require_relative 'cop/custom_error_class' require_relative 'cop/gem_fetcher' require_relative 'cop/in_batches' +require_relative 'cop/include_sidekiq_worker' require_relative 'cop/line_break_after_guard_clauses' require_relative 'cop/polymorphic_associations' require_relative 'cop/project_path_helper' require_relative 'cop/redirect_with_status' +require_relative 'cop/sidekiq_options_queue' require_relative 'cop/migration/add_column' require_relative 'cop/migration/add_concurrent_foreign_key' require_relative 'cop/migration/add_concurrent_index' diff --git a/spec/lib/gitlab/sidekiq_config_spec.rb b/spec/lib/gitlab/sidekiq_config_spec.rb index 09f95be2213..0c66d764851 100644 --- a/spec/lib/gitlab/sidekiq_config_spec.rb +++ b/spec/lib/gitlab/sidekiq_config_spec.rb @@ -16,9 +16,30 @@ describe Gitlab::SidekiqConfig do expect(queues).to include('post_receive') expect(queues).to include('merge') - expect(queues).to include('cronjob') + expect(queues).to include('cronjob:stuck_import_jobs') expect(queues).to include('mailers') expect(queues).to include('default') end end + + describe '.expand_queues' do + it 'expands queue namespaces to concrete queue names' do + queues = described_class.expand_queues(%w[cronjob]) + + expect(queues).to include('cronjob:stuck_import_jobs') + expect(queues).to include('cronjob:stuck_merge_jobs') + end + + it 'lets concrete queue names pass through' do + queues = described_class.expand_queues(%w[post_receive]) + + expect(queues).to include('post_receive') + end + + it 'lets unknown queues pass through' do + queues = described_class.expand_queues(%w[unknown]) + + expect(queues).to include('unknown') + end + end end diff --git a/spec/lib/gitlab/sidekiq_versioning/manager_spec.rb b/spec/lib/gitlab/sidekiq_versioning/manager_spec.rb new file mode 100644 index 00000000000..7debf70a16f --- /dev/null +++ b/spec/lib/gitlab/sidekiq_versioning/manager_spec.rb @@ -0,0 +1,22 @@ +require 'spec_helper' + +describe Gitlab::SidekiqVersioning::Manager do + before do + Sidekiq::Manager.prepend described_class + end + + describe '#initialize' do + it 'listens on all expanded queues' do + manager = Sidekiq::Manager.new(queues: %w[post_receive repository_fork cronjob unknown]) + + queues = manager.options[:queues] + + expect(queues).to include('post_receive') + expect(queues).to include('repository_fork') + expect(queues).to include('cronjob') + expect(queues).to include('cronjob:stuck_import_jobs') + expect(queues).to include('cronjob:stuck_merge_jobs') + expect(queues).to include('unknown') + end + end +end diff --git a/spec/lib/gitlab/sidekiq_versioning_spec.rb b/spec/lib/gitlab/sidekiq_versioning_spec.rb new file mode 100644 index 00000000000..fa6d42e730d --- /dev/null +++ b/spec/lib/gitlab/sidekiq_versioning_spec.rb @@ -0,0 +1,44 @@ +require 'spec_helper' + +describe Gitlab::SidekiqVersioning, :sidekiq, :redis do + let(:foo_worker) do + Class.new do + def self.name + 'FooWorker' + end + + include ApplicationWorker + end + end + + let(:bar_worker) do + Class.new do + def self.name + 'BarWorker' + end + + include ApplicationWorker + end + end + + before do + allow(Gitlab::SidekiqConfig).to receive(:workers).and_return([foo_worker, bar_worker]) + allow(Gitlab::SidekiqConfig).to receive(:worker_queues).and_return([foo_worker.queue, bar_worker.queue]) + end + + describe '.install!' do + it 'prepends SidekiqVersioning::Manager into Sidekiq::Manager' do + described_class.install! + + expect(Sidekiq::Manager).to include(Gitlab::SidekiqVersioning::Manager) + end + + it 'registers all versionless and versioned queues with Redis' do + described_class.install! + + queues = Sidekiq::Queue.all.map(&:name) + expect(queues).to include('foo') + expect(queues).to include('bar') + end + end +end diff --git a/spec/rubocop/cop/include_sidekiq_worker_spec.rb b/spec/rubocop/cop/include_sidekiq_worker_spec.rb new file mode 100644 index 00000000000..7f406535dda --- /dev/null +++ b/spec/rubocop/cop/include_sidekiq_worker_spec.rb @@ -0,0 +1,31 @@ +require 'spec_helper' +require 'rubocop' +require 'rubocop/rspec/support' +require_relative '../../../rubocop/cop/include_sidekiq_worker' + +describe RuboCop::Cop::IncludeSidekiqWorker do + include CopHelper + + subject(:cop) { described_class.new } + + context 'when `Sidekiq::Worker` is included' do + let(:source) { 'include Sidekiq::Worker' } + let(:correct_source) { 'include ApplicationWorker' } + + it 'registers an offense ' do + inspect_source(cop, source) + + aggregate_failures do + expect(cop.offenses.size).to eq(1) + expect(cop.offenses.map(&:line)).to eq([1]) + expect(cop.highlights).to eq(['Sidekiq::Worker']) + end + end + + it 'autocorrects to the right version' do + autocorrected = autocorrect_source(cop, source) + + expect(autocorrected).to eq(correct_source) + end + end +end diff --git a/spec/rubocop/cop/sidekiq_options_queue_spec.rb b/spec/rubocop/cop/sidekiq_options_queue_spec.rb new file mode 100644 index 00000000000..a31de381631 --- /dev/null +++ b/spec/rubocop/cop/sidekiq_options_queue_spec.rb @@ -0,0 +1,26 @@ +require 'spec_helper' +require 'rubocop' +require 'rubocop/rspec/support' +require_relative '../../../rubocop/cop/sidekiq_options_queue' + +describe RuboCop::Cop::SidekiqOptionsQueue do + include CopHelper + + subject(:cop) { described_class.new } + + it 'registers an offense when `sidekiq_options` is used with the `queue` option' do + inspect_source(cop, 'sidekiq_options queue: "some_queue"') + + aggregate_failures do + expect(cop.offenses.size).to eq(1) + expect(cop.offenses.map(&:line)).to eq([1]) + expect(cop.highlights).to eq(['queue: "some_queue"']) + end + end + + it 'does not register an offense when `sidekiq_options` is used with another option' do + inspect_source(cop, 'sidekiq_options retry: false') + + expect(cop.offenses).to be_empty + end +end diff --git a/spec/workers/concerns/application_worker_spec.rb b/spec/workers/concerns/application_worker_spec.rb index 0145563e0ed..901d77178bc 100644 --- a/spec/workers/concerns/application_worker_spec.rb +++ b/spec/workers/concerns/application_worker_spec.rb @@ -17,6 +17,14 @@ describe ApplicationWorker do end end + describe '.queue_namespace' do + it 'sets the queue name based on the class name' do + worker.queue_namespace :some_namespace + + expect(worker.queue).to eq('some_namespace:foo_bar_dummy') + end + end + describe '.queue' do it 'returns the queue name' do worker.sidekiq_options queue: :some_queue diff --git a/spec/workers/concerns/cluster_queue_spec.rb b/spec/workers/concerns/cluster_queue_spec.rb index 5049886b55c..4118b9aa194 100644 --- a/spec/workers/concerns/cluster_queue_spec.rb +++ b/spec/workers/concerns/cluster_queue_spec.rb @@ -14,6 +14,6 @@ describe ClusterQueue do it 'sets a default pipelines queue automatically' do expect(worker.sidekiq_options['queue']) - .to eq :gcp_cluster + .to eq 'gcp_cluster:dummy' end end diff --git a/spec/workers/concerns/cronjob_queue_spec.rb b/spec/workers/concerns/cronjob_queue_spec.rb index 3ae1c5f54d8..c042a52f41f 100644 --- a/spec/workers/concerns/cronjob_queue_spec.rb +++ b/spec/workers/concerns/cronjob_queue_spec.rb @@ -13,7 +13,7 @@ describe CronjobQueue do end it 'sets the queue name of a worker' do - expect(worker.sidekiq_options['queue'].to_s).to eq('cronjob') + expect(worker.sidekiq_options['queue'].to_s).to eq('cronjob:dummy') end it 'disables retrying of failed jobs' do diff --git a/spec/workers/concerns/gitlab/github_import/queue_spec.rb b/spec/workers/concerns/gitlab/github_import/queue_spec.rb index 9c69ee32da1..a96f583aff7 100644 --- a/spec/workers/concerns/gitlab/github_import/queue_spec.rb +++ b/spec/workers/concerns/gitlab/github_import/queue_spec.rb @@ -11,6 +11,6 @@ describe Gitlab::GithubImport::Queue do include Gitlab::GithubImport::Queue end - expect(worker.sidekiq_options['queue']).to eq('github_importer') + expect(worker.sidekiq_options['queue']).to eq('github_importer:dummy') end end diff --git a/spec/workers/concerns/pipeline_queue_spec.rb b/spec/workers/concerns/pipeline_queue_spec.rb index dd911760948..a312b307fce 100644 --- a/spec/workers/concerns/pipeline_queue_spec.rb +++ b/spec/workers/concerns/pipeline_queue_spec.rb @@ -14,15 +14,6 @@ describe PipelineQueue do it 'sets a default pipelines queue automatically' do expect(worker.sidekiq_options['queue']) - .to eq 'pipeline_default' - end - - describe '.enqueue_in' do - it 'sets a custom sidekiq queue with prefix and group' do - worker.enqueue_in(group: :processing) - - expect(worker.sidekiq_options['queue']) - .to eq 'pipeline_processing' - end + .to eq 'pipeline_default:dummy' end end diff --git a/spec/workers/concerns/repository_check_queue_spec.rb b/spec/workers/concerns/repository_check_queue_spec.rb index fdbbfcc90a5..d2eeecfc9a8 100644 --- a/spec/workers/concerns/repository_check_queue_spec.rb +++ b/spec/workers/concerns/repository_check_queue_spec.rb @@ -13,7 +13,7 @@ describe RepositoryCheckQueue do end it 'sets the queue name of a worker' do - expect(worker.sidekiq_options['queue'].to_s).to eq('repository_check') + expect(worker.sidekiq_options['queue'].to_s).to eq('repository_check:dummy') end it 'disables retrying of failed jobs' do diff --git a/spec/workers/every_sidekiq_worker_spec.rb b/spec/workers/every_sidekiq_worker_spec.rb index 7ee0a51a263..9e3b99b3502 100644 --- a/spec/workers/every_sidekiq_worker_spec.rb +++ b/spec/workers/every_sidekiq_worker_spec.rb @@ -1,21 +1,36 @@ require 'spec_helper' describe 'Every Sidekiq worker' do - it 'includes ApplicationWorker' do - expect(Gitlab::SidekiqConfig.workers).to all(include(ApplicationWorker)) - end - it 'does not use the default queue' do expect(Gitlab::SidekiqConfig.workers.map(&:queue)).not_to include('default') end it 'uses the cronjob queue when the worker runs as a cronjob' do - expect(Gitlab::SidekiqConfig.cron_workers.map(&:queue)).to all(eq('cronjob')) + expect(Gitlab::SidekiqConfig.cron_workers.map(&:queue)).to all(start_with('cronjob:')) + end + + it 'has its queue in app/workers/all_queues.yml', :aggregate_failures do + file_worker_queues = Gitlab::SidekiqConfig.worker_queues.to_set + + worker_queues = Gitlab::SidekiqConfig.workers.map(&:queue).to_set + worker_queues << ActionMailer::DeliveryJob.queue_name + worker_queues << 'default' + + missing_from_file = worker_queues - file_worker_queues + expect(missing_from_file).to be_empty, "expected #{missing_from_file.to_a.inspect} to be in app/workers/all_queues.yml" + + unncessarily_in_file = file_worker_queues - worker_queues + expect(unncessarily_in_file).to be_empty, "expected #{unncessarily_in_file.to_a.inspect} not to be in app/workers/all_queues.yml" end - it 'defines the queue in the Sidekiq configuration file' do - config_queue_names = Gitlab::SidekiqConfig.config_queues.to_set + it 'has its queue or namespace in config/sidekiq_queues.yml', :aggregate_failures do + config_queues = Gitlab::SidekiqConfig.config_queues.to_set + + Gitlab::SidekiqConfig.workers.each do |worker| + queue = worker.queue + queue_namespace = queue.split(':').first - expect(Gitlab::SidekiqConfig.worker_queues).to all(be_in(config_queue_names)) + expect(config_queues).to include(queue).or(include(queue_namespace)) + end end end |