diff options
author | Grzegorz Bizon <grzegorz@gitlab.com> | 2017-08-28 10:08:02 +0000 |
---|---|---|
committer | Grzegorz Bizon <grzegorz@gitlab.com> | 2017-08-28 10:08:02 +0000 |
commit | d1eda393f2b42832d2b5c5fbcdc03ee7e568cdd5 (patch) | |
tree | 39bef1e3660d4cabc4a6cd48e3edfe09e66929ed | |
parent | 922eb6d331ba26a8e89a390f528a845dbf195e44 (diff) | |
parent | 15ace6a910ecb889c58fd4838a4f9a9b710178c7 (diff) | |
download | gitlab-ce-d1eda393f2b42832d2b5c5fbcdc03ee7e568cdd5.tar.gz |
Merge branch 'backstage/gb/rename-ci-cd-processing-sidekiq-queues' into 'master'
Rename CI/CD related Sidekiq queues
Closes #35532
See merge request !13714
22 files changed, 197 insertions, 35 deletions
diff --git a/app/workers/build_coverage_worker.rb b/app/workers/build_coverage_worker.rb index f7ae996bb17..cd4af85d047 100644 --- a/app/workers/build_coverage_worker.rb +++ b/app/workers/build_coverage_worker.rb @@ -1,6 +1,6 @@ class BuildCoverageWorker include Sidekiq::Worker - include BuildQueue + include PipelineQueue def perform(build_id) Ci::Build.find_by(id: build_id)&.update_coverage diff --git a/app/workers/build_finished_worker.rb b/app/workers/build_finished_worker.rb index 466410bf08c..e2a1b3dcc41 100644 --- a/app/workers/build_finished_worker.rb +++ b/app/workers/build_finished_worker.rb @@ -1,6 +1,8 @@ class BuildFinishedWorker include Sidekiq::Worker - include BuildQueue + include PipelineQueue + + enqueue_in group: :processing def perform(build_id) Ci::Build.find_by(id: build_id).try do |build| diff --git a/app/workers/build_hooks_worker.rb b/app/workers/build_hooks_worker.rb index 9965af935d4..dedaf2835e6 100644 --- a/app/workers/build_hooks_worker.rb +++ b/app/workers/build_hooks_worker.rb @@ -1,6 +1,8 @@ class BuildHooksWorker include Sidekiq::Worker - include BuildQueue + include PipelineQueue + + enqueue_in group: :hooks def perform(build_id) Ci::Build.find_by(id: build_id) diff --git a/app/workers/build_queue_worker.rb b/app/workers/build_queue_worker.rb index fa9e097e40a..e5ceb9ef715 100644 --- a/app/workers/build_queue_worker.rb +++ b/app/workers/build_queue_worker.rb @@ -1,6 +1,8 @@ class BuildQueueWorker include Sidekiq::Worker - include BuildQueue + include PipelineQueue + + enqueue_in group: :processing def perform(build_id) Ci::Build.find_by(id: build_id).try do |build| diff --git a/app/workers/build_success_worker.rb b/app/workers/build_success_worker.rb index bf009dfab0f..20ec24bd18a 100644 --- a/app/workers/build_success_worker.rb +++ b/app/workers/build_success_worker.rb @@ -1,6 +1,8 @@ class BuildSuccessWorker include Sidekiq::Worker - include BuildQueue + include PipelineQueue + + enqueue_in group: :processing def perform(build_id) Ci::Build.find_by(id: build_id).try do |build| diff --git a/app/workers/concerns/build_queue.rb b/app/workers/concerns/build_queue.rb deleted file mode 100644 index cf0ead40a8b..00000000000 --- a/app/workers/concerns/build_queue.rb +++ /dev/null @@ -1,8 +0,0 @@ -# Concern for setting Sidekiq settings for the various CI build workers. -module BuildQueue - extend ActiveSupport::Concern - - included do - sidekiq_options queue: :build - end -end diff --git a/app/workers/concerns/pipeline_queue.rb b/app/workers/concerns/pipeline_queue.rb index ca3860e1d38..ddf45b91345 100644 --- a/app/workers/concerns/pipeline_queue.rb +++ b/app/workers/concerns/pipeline_queue.rb @@ -1,8 +1,18 @@ +## # Concern for setting Sidekiq settings for the various CI pipeline workers. +# module PipelineQueue extend ActiveSupport::Concern included do - sidekiq_options queue: :pipeline + sidekiq_options queue: 'pipeline_default' + end + + class_methods do + def enqueue_in(group:) + raise ArgumentError, 'Unspecified queue group!' if group.empty? + + sidekiq_options queue: "pipeline_#{group}" + end end end diff --git a/app/workers/expire_job_cache_worker.rb b/app/workers/expire_job_cache_worker.rb index e383202260d..98a7500bffe 100644 --- a/app/workers/expire_job_cache_worker.rb +++ b/app/workers/expire_job_cache_worker.rb @@ -1,6 +1,8 @@ class ExpireJobCacheWorker include Sidekiq::Worker - include BuildQueue + include PipelineQueue + + enqueue_in group: :cache def perform(job_id) job = CommitStatus.joins(:pipeline, :project).find_by(id: job_id) diff --git a/app/workers/expire_pipeline_cache_worker.rb b/app/workers/expire_pipeline_cache_worker.rb index 7c02d6cf892..1a0e7f92875 100644 --- a/app/workers/expire_pipeline_cache_worker.rb +++ b/app/workers/expire_pipeline_cache_worker.rb @@ -2,6 +2,8 @@ class ExpirePipelineCacheWorker include Sidekiq::Worker include PipelineQueue + enqueue_in group: :cache + def perform(pipeline_id) pipeline = Ci::Pipeline.find_by(id: pipeline_id) return unless pipeline diff --git a/app/workers/pipeline_hooks_worker.rb b/app/workers/pipeline_hooks_worker.rb index 7e36eacebf8..30a75ec8435 100644 --- a/app/workers/pipeline_hooks_worker.rb +++ b/app/workers/pipeline_hooks_worker.rb @@ -2,6 +2,8 @@ class PipelineHooksWorker include Sidekiq::Worker include PipelineQueue + enqueue_in group: :hooks + def perform(pipeline_id) Ci::Pipeline.find_by(id: pipeline_id) .try(:execute_hooks) diff --git a/app/workers/pipeline_process_worker.rb b/app/workers/pipeline_process_worker.rb index 357e4a9a1c3..8c067d05081 100644 --- a/app/workers/pipeline_process_worker.rb +++ b/app/workers/pipeline_process_worker.rb @@ -2,6 +2,8 @@ class PipelineProcessWorker include Sidekiq::Worker include PipelineQueue + enqueue_in group: :processing + def perform(pipeline_id) Ci::Pipeline.find_by(id: pipeline_id) .try(:process!) diff --git a/app/workers/pipeline_success_worker.rb b/app/workers/pipeline_success_worker.rb index cc0eb708cf9..cb8bb2ffe75 100644 --- a/app/workers/pipeline_success_worker.rb +++ b/app/workers/pipeline_success_worker.rb @@ -2,6 +2,8 @@ class PipelineSuccessWorker include Sidekiq::Worker include PipelineQueue + enqueue_in group: :processing + def perform(pipeline_id) Ci::Pipeline.find_by(id: pipeline_id).try do |pipeline| MergeRequests::MergeWhenPipelineSucceedsService diff --git a/app/workers/pipeline_update_worker.rb b/app/workers/pipeline_update_worker.rb index 96c4152c674..5fa399dff4c 100644 --- a/app/workers/pipeline_update_worker.rb +++ b/app/workers/pipeline_update_worker.rb @@ -2,6 +2,8 @@ class PipelineUpdateWorker include Sidekiq::Worker include PipelineQueue + enqueue_in group: :processing + def perform(pipeline_id) Ci::Pipeline.find_by(id: pipeline_id) .try(:update_status) diff --git a/app/workers/stage_update_worker.rb b/app/workers/stage_update_worker.rb index eef0b11e70b..c301cea5ad6 100644 --- a/app/workers/stage_update_worker.rb +++ b/app/workers/stage_update_worker.rb @@ -2,6 +2,8 @@ class StageUpdateWorker include Sidekiq::Worker include PipelineQueue + enqueue_in group: :processing + def perform(stage_id) Ci::Stage.find_by(id: stage_id).try do |stage| stage.update_status diff --git a/config/sidekiq_queues.yml b/config/sidekiq_queues.yml index 83abc83c9f0..24c001362c6 100644 --- a/config/sidekiq_queues.yml +++ b/config/sidekiq_queues.yml @@ -27,6 +27,10 @@ - [new_merge_request, 2] - [build, 2] - [pipeline, 2] + - [pipeline_processing, 5] + - [pipeline_default, 3] + - [pipeline_cache, 3] + - [pipeline_hooks, 2] - [gitlab_shell, 2] - [email_receiver, 2] - [emails_on_push, 2] diff --git a/db/post_migrate/20170822101017_migrate_pipeline_sidekiq_queues.rb b/db/post_migrate/20170822101017_migrate_pipeline_sidekiq_queues.rb new file mode 100644 index 00000000000..8441cfe7968 --- /dev/null +++ b/db/post_migrate/20170822101017_migrate_pipeline_sidekiq_queues.rb @@ -0,0 +1,17 @@ +class MigratePipelineSidekiqQueues < ActiveRecord::Migration + include Gitlab::Database::MigrationHelpers + + DOWNTIME = false + + def up + sidekiq_queue_migrate 'build', to: 'pipeline_default' + sidekiq_queue_migrate 'pipeline', to: 'pipeline_default' + end + + def down + sidekiq_queue_migrate 'pipeline_default', to: 'pipeline' + sidekiq_queue_migrate 'pipeline_processing', to: 'pipeline' + sidekiq_queue_migrate 'pipeline_hooks', to: 'pipeline' + sidekiq_queue_migrate 'pipeline_cache', to: 'pipeline' + end +end diff --git a/lib/gitlab/database/migration_helpers.rb b/lib/gitlab/database/migration_helpers.rb index b83e633c7ed..5e2c6cc5cad 100644 --- a/lib/gitlab/database/migration_helpers.rb +++ b/lib/gitlab/database/migration_helpers.rb @@ -611,6 +611,20 @@ module Gitlab remove_foreign_key(*args) rescue ArgumentError end + + def sidekiq_queue_migrate(queue_from, to:) + while sidekiq_queue_length(queue_from) > 0 + Sidekiq.redis do |conn| + conn.rpoplpush "queue:#{queue_from}", "queue:#{to}" + end + end + end + + def sidekiq_queue_length(queue_name) + Sidekiq.redis do |conn| + conn.llen("queue:#{queue_name}") + end + end end end end diff --git a/spec/lib/gitlab/database/migration_helpers_spec.rb b/spec/lib/gitlab/database/migration_helpers_spec.rb index ec2274a70aa..c25fd459dd7 100644 --- a/spec/lib/gitlab/database/migration_helpers_spec.rb +++ b/spec/lib/gitlab/database/migration_helpers_spec.rb @@ -2,9 +2,7 @@ require 'spec_helper' describe Gitlab::Database::MigrationHelpers do let(:model) do - ActiveRecord::Migration.new.extend( - described_class - ) + ActiveRecord::Migration.new.extend(described_class) end before do @@ -845,4 +843,51 @@ describe Gitlab::Database::MigrationHelpers do end end end + + describe 'sidekiq migration helpers', :sidekiq, :redis do + let(:worker) do + Class.new do + include Sidekiq::Worker + sidekiq_options queue: 'test' + end + end + + describe '#sidekiq_queue_length' do + context 'when queue is empty' do + it 'returns zero' do + Sidekiq::Testing.disable! do + expect(model.sidekiq_queue_length('test')).to eq 0 + end + end + end + + context 'when queue contains jobs' do + it 'returns correct size of the queue' do + Sidekiq::Testing.disable! do + worker.perform_async('Something', [1]) + worker.perform_async('Something', [2]) + + expect(model.sidekiq_queue_length('test')).to eq 2 + end + end + end + end + + describe '#migrate_sidekiq_queue' do + it 'migrates jobs from one sidekiq queue to another' do + Sidekiq::Testing.disable! do + worker.perform_async('Something', [1]) + worker.perform_async('Something', [2]) + + expect(model.sidekiq_queue_length('test')).to eq 2 + expect(model.sidekiq_queue_length('new_test')).to eq 0 + + model.sidekiq_queue_migrate('test', to: 'new_test') + + expect(model.sidekiq_queue_length('test')).to eq 0 + expect(model.sidekiq_queue_length('new_test')).to eq 2 + end + end + end + end end diff --git a/spec/migrations/migrate_pipeline_sidekiq_queues_spec.rb b/spec/migrations/migrate_pipeline_sidekiq_queues_spec.rb new file mode 100644 index 00000000000..e02bcd2f4da --- /dev/null +++ b/spec/migrations/migrate_pipeline_sidekiq_queues_spec.rb @@ -0,0 +1,55 @@ +require 'spec_helper' +require Rails.root.join('db', 'post_migrate', '20170822101017_migrate_pipeline_sidekiq_queues.rb') + +describe MigratePipelineSidekiqQueues, :sidekiq, :redis do + include Gitlab::Database::MigrationHelpers + + context 'when there are jobs in the queues' do + it 'correctly migrates queue when migrating up' do + Sidekiq::Testing.disable! do + stubbed_worker(queue: :pipeline).perform_async('Something', [1]) + stubbed_worker(queue: :build).perform_async('Something', [1]) + + described_class.new.up + + expect(sidekiq_queue_length('pipeline')).to eq 0 + expect(sidekiq_queue_length('build')).to eq 0 + expect(sidekiq_queue_length('pipeline_default')).to eq 2 + end + end + + it 'correctly migrates queue when migrating down' do + Sidekiq::Testing.disable! do + stubbed_worker(queue: :pipeline_default).perform_async('Class', [1]) + stubbed_worker(queue: :pipeline_processing).perform_async('Class', [2]) + stubbed_worker(queue: :pipeline_hooks).perform_async('Class', [3]) + stubbed_worker(queue: :pipeline_cache).perform_async('Class', [4]) + + described_class.new.down + + expect(sidekiq_queue_length('pipeline')).to eq 4 + expect(sidekiq_queue_length('pipeline_default')).to eq 0 + expect(sidekiq_queue_length('pipeline_processing')).to eq 0 + expect(sidekiq_queue_length('pipeline_hooks')).to eq 0 + expect(sidekiq_queue_length('pipeline_cache')).to eq 0 + end + end + end + + context 'when there are no jobs in the queues' do + it 'does not raise error when migrating up' do + expect { described_class.new.up }.not_to raise_error + end + + it 'does not raise error when migrating down' do + expect { described_class.new.down }.not_to raise_error + end + end + + def stubbed_worker(queue:) + Class.new do + include Sidekiq::Worker + sidekiq_options queue: queue + end + end +end diff --git a/spec/workers/concerns/build_queue_spec.rb b/spec/workers/concerns/build_queue_spec.rb deleted file mode 100644 index 6bf955e0be2..00000000000 --- a/spec/workers/concerns/build_queue_spec.rb +++ /dev/null @@ -1,14 +0,0 @@ -require 'spec_helper' - -describe BuildQueue do - let(:worker) do - Class.new do - include Sidekiq::Worker - include BuildQueue - end - end - - it 'sets the queue name of a worker' do - expect(worker.sidekiq_options['queue'].to_s).to eq('build') - end -end diff --git a/spec/workers/concerns/pipeline_queue_spec.rb b/spec/workers/concerns/pipeline_queue_spec.rb index 40794d0e42a..eac5a770e5f 100644 --- a/spec/workers/concerns/pipeline_queue_spec.rb +++ b/spec/workers/concerns/pipeline_queue_spec.rb @@ -8,7 +8,17 @@ describe PipelineQueue do end end - it 'sets the queue name of a worker' do - expect(worker.sidekiq_options['queue'].to_s).to eq('pipeline') + it 'sets a default pipelines queue automatically' do + expect(worker.sidekiq_options['queue']) + .to eq 'pipeline_default' + end + + describe '.enqueue_in' do + it 'sets a custom sidekiq queue with prefix and group' do + worker.enqueue_in(group: :processing) + + expect(worker.sidekiq_options['queue']) + .to eq 'pipeline_processing' + end end end diff --git a/spec/workers/pipeline_metrics_worker_spec.rb b/spec/workers/pipeline_metrics_worker_spec.rb index ef71125c0b6..896f9e6e7f2 100644 --- a/spec/workers/pipeline_metrics_worker_spec.rb +++ b/spec/workers/pipeline_metrics_worker_spec.rb @@ -2,7 +2,12 @@ require 'spec_helper' describe PipelineMetricsWorker do let(:project) { create(:project, :repository) } - let!(:merge_request) { create(:merge_request, source_project: project, source_branch: pipeline.ref, head_pipeline: pipeline) } + + let!(:merge_request) do + create(:merge_request, source_project: project, + source_branch: pipeline.ref, + head_pipeline: pipeline) + end let(:pipeline) do create(:ci_empty_pipeline, @@ -14,6 +19,8 @@ describe PipelineMetricsWorker do finished_at: Time.now) end + let(:status) { 'pending' } + describe '#perform' do before do described_class.new.perform(pipeline.id) |