diff options
Diffstat (limited to 'app')
22 files changed, 144 insertions, 30 deletions
diff --git a/app/workers/all_queues.yml b/app/workers/all_queues.yml new file mode 100644 index 00000000000..ba31a5aa9c2 --- /dev/null +++ b/app/workers/all_queues.yml @@ -0,0 +1,98 @@ +--- +- cronjob:admin_email +- cronjob:expire_build_artifacts +- cronjob:gitlab_usage_ping +- cronjob:import_export_project_cleanup +- cronjob:pipeline_schedule +- cronjob:prune_old_events +- cronjob:remove_expired_group_links +- cronjob:remove_expired_members +- cronjob:remove_old_web_hook_logs +- cronjob:remove_unreferenced_lfs_objects +- cronjob:repository_archive_cache +- cronjob:repository_check_batch +- cronjob:requests_profiles +- cronjob:schedule_update_user_activity +- cronjob:stuck_ci_jobs +- cronjob:stuck_import_jobs +- cronjob:stuck_merge_jobs +- cronjob:trending_projects + +- gcp_cluster:cluster_install_app +- gcp_cluster:cluster_provision +- gcp_cluster:cluster_wait_for_app_installation +- gcp_cluster:wait_for_cluster_creation + +- github_import_advance_stage +- github_importer:github_import_import_diff_note +- github_importer:github_import_import_issue +- github_importer:github_import_import_note +- github_importer:github_import_import_pull_request +- github_importer:github_import_refresh_import_jid +- github_importer:github_import_stage_finish_import +- github_importer:github_import_stage_import_base_data +- github_importer:github_import_stage_import_issues_and_diff_notes +- github_importer:github_import_stage_import_notes +- github_importer:github_import_stage_import_pull_requests +- github_importer:github_import_stage_import_repository + +- pipeline_cache:expire_job_cache +- pipeline_cache:expire_pipeline_cache +- pipeline_creation:create_pipeline +- pipeline_default:build_coverage +- pipeline_default:build_trace_sections +- pipeline_default:pipeline_metrics +- pipeline_default:pipeline_notification +- pipeline_default:update_head_pipeline_for_merge_request +- pipeline_hooks:build_hooks +- pipeline_hooks:pipeline_hooks +- pipeline_processing:build_finished +- pipeline_processing:build_queue +- pipeline_processing:build_success +- pipeline_processing:pipeline_process +- pipeline_processing:pipeline_success +- pipeline_processing:pipeline_update +- pipeline_processing:stage_update + +- repository_check:repository_check_clear +- repository_check:repository_check_single_repository + +- default +- mailers # ActionMailer::DeliveryJob.queue_name + +- authorized_projects +- background_migration +- create_gpg_signature +- delete_merged_branches +- delete_user +- email_receiver +- emails_on_push +- expire_build_instance_artifacts +- git_garbage_collect +- gitlab_shell +- group_destroy +- invalid_gpg_signature_update +- irker +- merge +- namespaceless_project_destroy +- new_issue +- new_merge_request +- new_note +- pages +- post_receive +- process_commit +- project_cache +- project_destroy +- project_export +- project_migrate_hashed_storage +- project_service +- propagate_service_template +- reactive_caching +- repository_fork +- repository_import +- storage_migrator +- system_hook_push +- update_merge_requests +- update_user_activity +- upload_checksum +- web_hook diff --git a/app/workers/build_finished_worker.rb b/app/workers/build_finished_worker.rb index 5efa9180f5e..97d80305bec 100644 --- a/app/workers/build_finished_worker.rb +++ b/app/workers/build_finished_worker.rb @@ -2,7 +2,7 @@ class BuildFinishedWorker include ApplicationWorker include PipelineQueue - enqueue_in group: :processing + queue_namespace :pipeline_processing def perform(build_id) Ci::Build.find_by(id: build_id).try do |build| diff --git a/app/workers/build_hooks_worker.rb b/app/workers/build_hooks_worker.rb index 6705a1c2709..cbfca8c342c 100644 --- a/app/workers/build_hooks_worker.rb +++ b/app/workers/build_hooks_worker.rb @@ -2,7 +2,7 @@ class BuildHooksWorker include ApplicationWorker include PipelineQueue - enqueue_in group: :hooks + queue_namespace :pipeline_hooks def perform(build_id) Ci::Build.find_by(id: build_id) diff --git a/app/workers/build_queue_worker.rb b/app/workers/build_queue_worker.rb index fc775a84dc0..e4f4e6c1d9e 100644 --- a/app/workers/build_queue_worker.rb +++ b/app/workers/build_queue_worker.rb @@ -2,7 +2,7 @@ class BuildQueueWorker include ApplicationWorker include PipelineQueue - enqueue_in group: :processing + queue_namespace :pipeline_processing def perform(build_id) Ci::Build.find_by(id: build_id).try do |build| diff --git a/app/workers/build_success_worker.rb b/app/workers/build_success_worker.rb index ec049821ad7..4b9097bc5e4 100644 --- a/app/workers/build_success_worker.rb +++ b/app/workers/build_success_worker.rb @@ -2,7 +2,7 @@ class BuildSuccessWorker include ApplicationWorker include PipelineQueue - enqueue_in group: :processing + queue_namespace :pipeline_processing def perform(build_id) Ci::Build.find_by(id: build_id).try do |build| diff --git a/app/workers/concerns/application_worker.rb b/app/workers/concerns/application_worker.rb index 9c3bdabc49e..285b5bada7d 100644 --- a/app/workers/concerns/application_worker.rb +++ b/app/workers/concerns/application_worker.rb @@ -6,10 +6,20 @@ module ApplicationWorker include Sidekiq::Worker included do - sidekiq_options queue: base_queue_name + set_queue end module ClassMethods + def inherited(subclass) + subclass.set_queue + end + + def set_queue + queue_name = [queue_namespace, base_queue_name].compact.join(':') + + sidekiq_options queue: queue_name + end + def base_queue_name name .sub(/\AGitlab::/, '') @@ -18,6 +28,16 @@ module ApplicationWorker .tr('/', '_') end + def queue_namespace(new_namespace = nil) + if new_namespace + sidekiq_options queue_namespace: new_namespace + + set_queue + else + get_sidekiq_options['queue_namespace']&.to_s + end + end + def queue get_sidekiq_options['queue'].to_s end diff --git a/app/workers/concerns/cluster_queue.rb b/app/workers/concerns/cluster_queue.rb index a5074d13220..24b9f145220 100644 --- a/app/workers/concerns/cluster_queue.rb +++ b/app/workers/concerns/cluster_queue.rb @@ -5,6 +5,6 @@ module ClusterQueue extend ActiveSupport::Concern included do - sidekiq_options queue: :gcp_cluster + queue_namespace :gcp_cluster end end diff --git a/app/workers/concerns/cronjob_queue.rb b/app/workers/concerns/cronjob_queue.rb index e918bb011e0..b6581779f6a 100644 --- a/app/workers/concerns/cronjob_queue.rb +++ b/app/workers/concerns/cronjob_queue.rb @@ -4,6 +4,7 @@ module CronjobQueue extend ActiveSupport::Concern included do - sidekiq_options queue: :cronjob, retry: false + queue_namespace :cronjob + sidekiq_options retry: false end end diff --git a/app/workers/concerns/gitlab/github_import/queue.rb b/app/workers/concerns/gitlab/github_import/queue.rb index a2bee361b86..22c2ce458e8 100644 --- a/app/workers/concerns/gitlab/github_import/queue.rb +++ b/app/workers/concerns/gitlab/github_import/queue.rb @@ -4,12 +4,14 @@ module Gitlab extend ActiveSupport::Concern included do + queue_namespace :github_importer + # If a job produces an error it may block a stage from advancing # forever. To prevent this from happening we prevent jobs from going to # the dead queue. This does mean some resources may not be imported, but # this is better than a project being stuck in the "import" state # forever. - sidekiq_options queue: 'github_importer', dead: false, retry: 5 + sidekiq_options dead: false, retry: 5 end end end diff --git a/app/workers/concerns/pipeline_queue.rb b/app/workers/concerns/pipeline_queue.rb index ddf45b91345..e77093a6902 100644 --- a/app/workers/concerns/pipeline_queue.rb +++ b/app/workers/concerns/pipeline_queue.rb @@ -5,14 +5,6 @@ module PipelineQueue extend ActiveSupport::Concern included do - sidekiq_options queue: 'pipeline_default' - end - - class_methods do - def enqueue_in(group:) - raise ArgumentError, 'Unspecified queue group!' if group.empty? - - sidekiq_options queue: "pipeline_#{group}" - end + queue_namespace :pipeline_default end end diff --git a/app/workers/concerns/repository_check_queue.rb b/app/workers/concerns/repository_check_queue.rb index a597321ccf4..43fb66c31b0 100644 --- a/app/workers/concerns/repository_check_queue.rb +++ b/app/workers/concerns/repository_check_queue.rb @@ -3,6 +3,8 @@ module RepositoryCheckQueue extend ActiveSupport::Concern included do - sidekiq_options queue: :repository_check, retry: false + queue_namespace :repository_check + + sidekiq_options retry: false end end diff --git a/app/workers/create_pipeline_worker.rb b/app/workers/create_pipeline_worker.rb index 00cd7b85b9f..c3ac35e54f5 100644 --- a/app/workers/create_pipeline_worker.rb +++ b/app/workers/create_pipeline_worker.rb @@ -2,7 +2,7 @@ class CreatePipelineWorker include ApplicationWorker include PipelineQueue - enqueue_in group: :creation + queue_namespace :pipeline_creation def perform(project_id, user_id, ref, source, params = {}) project = Project.find(project_id) diff --git a/app/workers/expire_job_cache_worker.rb b/app/workers/expire_job_cache_worker.rb index a591e2da519..7217364a9f2 100644 --- a/app/workers/expire_job_cache_worker.rb +++ b/app/workers/expire_job_cache_worker.rb @@ -2,7 +2,7 @@ class ExpireJobCacheWorker include ApplicationWorker include PipelineQueue - enqueue_in group: :cache + queue_namespace :pipeline_cache def perform(job_id) job = CommitStatus.joins(:pipeline, :project).find_by(id: job_id) diff --git a/app/workers/expire_pipeline_cache_worker.rb b/app/workers/expire_pipeline_cache_worker.rb index a3ac32b437d..3e34de22c19 100644 --- a/app/workers/expire_pipeline_cache_worker.rb +++ b/app/workers/expire_pipeline_cache_worker.rb @@ -2,7 +2,7 @@ class ExpirePipelineCacheWorker include ApplicationWorker include PipelineQueue - enqueue_in group: :cache + queue_namespace :pipeline_cache def perform(pipeline_id) pipeline = Ci::Pipeline.find_by(id: pipeline_id) diff --git a/app/workers/gitlab/github_import/advance_stage_worker.rb b/app/workers/gitlab/github_import/advance_stage_worker.rb index 400396d5755..f7f498af840 100644 --- a/app/workers/gitlab/github_import/advance_stage_worker.rb +++ b/app/workers/gitlab/github_import/advance_stage_worker.rb @@ -9,7 +9,7 @@ module Gitlab class AdvanceStageWorker include ApplicationWorker - sidekiq_options queue: 'github_importer_advance_stage', dead: false + sidekiq_options dead: false INTERVAL = 30.seconds.to_i diff --git a/app/workers/pages_worker.rb b/app/workers/pages_worker.rb index 62f733c02fc..3ec81d040b4 100644 --- a/app/workers/pages_worker.rb +++ b/app/workers/pages_worker.rb @@ -1,7 +1,7 @@ class PagesWorker include ApplicationWorker - sidekiq_options queue: :pages, retry: false + sidekiq_options retry: false def perform(action, *arg) send(action, *arg) # rubocop:disable GitlabSecurity/PublicSend diff --git a/app/workers/pipeline_hooks_worker.rb b/app/workers/pipeline_hooks_worker.rb index 661c29efe88..c94918ff4ee 100644 --- a/app/workers/pipeline_hooks_worker.rb +++ b/app/workers/pipeline_hooks_worker.rb @@ -2,7 +2,7 @@ class PipelineHooksWorker include ApplicationWorker include PipelineQueue - enqueue_in group: :hooks + queue_namespace :pipeline_hooks def perform(pipeline_id) Ci::Pipeline.find_by(id: pipeline_id) diff --git a/app/workers/pipeline_process_worker.rb b/app/workers/pipeline_process_worker.rb index 07dbf6a971e..24424b3f472 100644 --- a/app/workers/pipeline_process_worker.rb +++ b/app/workers/pipeline_process_worker.rb @@ -2,7 +2,7 @@ class PipelineProcessWorker include ApplicationWorker include PipelineQueue - enqueue_in group: :processing + queue_namespace :pipeline_processing def perform(pipeline_id) Ci::Pipeline.find_by(id: pipeline_id) diff --git a/app/workers/pipeline_success_worker.rb b/app/workers/pipeline_success_worker.rb index 68c40a259e1..2ab0739a17f 100644 --- a/app/workers/pipeline_success_worker.rb +++ b/app/workers/pipeline_success_worker.rb @@ -2,7 +2,7 @@ class PipelineSuccessWorker include ApplicationWorker include PipelineQueue - enqueue_in group: :processing + queue_namespace :pipeline_processing def perform(pipeline_id) Ci::Pipeline.find_by(id: pipeline_id).try do |pipeline| diff --git a/app/workers/pipeline_update_worker.rb b/app/workers/pipeline_update_worker.rb index 24a8a9fbed5..fc9da2d45b1 100644 --- a/app/workers/pipeline_update_worker.rb +++ b/app/workers/pipeline_update_worker.rb @@ -2,7 +2,7 @@ class PipelineUpdateWorker include ApplicationWorker include PipelineQueue - enqueue_in group: :processing + queue_namespace :pipeline_processing def perform(pipeline_id) Ci::Pipeline.find_by(id: pipeline_id) diff --git a/app/workers/stage_update_worker.rb b/app/workers/stage_update_worker.rb index 69f2318d83b..e4b683fca33 100644 --- a/app/workers/stage_update_worker.rb +++ b/app/workers/stage_update_worker.rb @@ -2,7 +2,7 @@ class StageUpdateWorker include ApplicationWorker include PipelineQueue - enqueue_in group: :processing + queue_namespace :pipeline_processing def perform(stage_id) Ci::Stage.find_by(id: stage_id).try do |stage| diff --git a/app/workers/update_head_pipeline_for_merge_request_worker.rb b/app/workers/update_head_pipeline_for_merge_request_worker.rb index 0a2e9b63578..f4d2ec28bea 100644 --- a/app/workers/update_head_pipeline_for_merge_request_worker.rb +++ b/app/workers/update_head_pipeline_for_merge_request_worker.rb @@ -1,7 +1,6 @@ class UpdateHeadPipelineForMergeRequestWorker include ApplicationWorker - - sidekiq_options queue: 'pipeline_default' + include PipelineQueue def perform(merge_request_id) merge_request = MergeRequest.find(merge_request_id) |