diff options
author | Shinya Maeda <shinya@gitlab.com> | 2019-08-01 15:50:05 +0700 |
---|---|---|
committer | Shinya Maeda <shinya@gitlab.com> | 2019-08-20 14:25:54 +0700 |
commit | 5b1ff96f4d00ddc65a287d428c83308758622d80 (patch) | |
tree | 6d05cc742c7ed71eb065b468a9f3c0de3807b301 | |
parent | da9606512846aca61fb52f8afd6e9742426f8e3a (diff) | |
download | gitlab-ce-de-duplicate-pipeline-process-sidekiq-jobs.tar.gz |
De-duplicate pipeline process sidekiq jobsde-duplicate-pipeline-process-sidekiq-jobs
This commits deduplicates unnecessary pipeline process sidekiq jobs
by using a batch pop queueing system.
This way, the duplicate process will be processed later only once.
-rw-r--r-- | app/workers/concerns/sidekiq_job_deduplicater.rb | 73 | ||||
-rw-r--r-- | app/workers/pipeline_process_worker.rb | 7 | ||||
-rw-r--r-- | changelogs/unreleased/de-duplicate-pipeline-process-sidekiq-jobs.yml | 5 | ||||
-rw-r--r-- | spec/workers/concerns/sidekiq_job_deduplicater_spec.rb | 65 | ||||
-rw-r--r-- | spec/workers/pipeline_process_worker_spec.rb | 52 | ||||
-rw-r--r-- | spec/workers/stuck_ci_jobs_worker_spec.rb | 1 |
6 files changed, 197 insertions, 6 deletions
diff --git a/app/workers/concerns/sidekiq_job_deduplicater.rb b/app/workers/concerns/sidekiq_job_deduplicater.rb new file mode 100644 index 00000000000..aef0d674f20 --- /dev/null +++ b/app/workers/concerns/sidekiq_job_deduplicater.rb @@ -0,0 +1,73 @@ +# frozen_string_literal: true + +## +# Deduplicate sidekiq jobs with BatchPopQueueing. +# +# Often a sidekiq worker performs on the exact same argument for multiple times. +# This might make sense, however, it could be a waste of compute resource that +# sidekiq jobs are basically idempotent and results will likely be the same. Also, +# it could cause an unnecessary race condition that multiple jobs access the same +# data asynchronously. +# +# We can resolve such concerns by `SidekiqJobDeduplicater`, which +# processes a single item exclusively. When multiple sidekiq jobs are initiated +# with **the same argument**, the first item is executed and the following items +# are enqueued into a batch-pop-queue instead of an immediate execution. +# In this enqueueing process, the system removes duplicate items proactively, thus +# the next item is always one, becuase we enqueue the same item to the queue. +# Once sidekiq job finished to work on the first item, it works on the +# same item again, which ensures that all sidekiq jobs are processed after +# the enqueueing. +# +# NOTE: +# - Currently, this class is not compatible with multiple arguments in sidekiq classes. +# - If the sidekiq worker is often killed by sidekiq memory killer due to the memory consumption, +# the exclusive lock might linger in Redis and it could interrupt the next execution. +module SidekiqJobDeduplicater + extend ActiveSupport::Concern + extend ::Gitlab::Utils::Override + + prepended do + class_attribute :deduplicater_default_enabled + class_attribute :deduplicater_lock_timeout + + # The deduplicater is behind a feature flag and you can disable the behavior + # by disabling the feature flag. + # The deduplicater is enabled by default, if you want to disable by default, + # set `false` to the `deduplicater_default_enabled` vaule. + self.deduplicater_default_enabled = true + + # The deduplicater runs the process in an exclusive lock and while the lock + # is effective the duplicate sidekiq jobs will be absorbed or defered after + # the current process finishes. + # Basically, you should set `deduplicater_lock_timeout` a greater vaule than + # the maximum execution time of the sidekiq job. + self.deduplicater_lock_timeout = 10.minutes + end + + override :perform + def perform(arg) + if Feature.enabled?(feature_flag_name, default_enabled: deduplicater_default_enabled) + result = Gitlab::BatchPopQueueing.new(sanitized_worker_name, arg.to_s) + .safe_execute([arg], lock_timeout: deduplicater_lock_timeout) do |items| + super(items.first) + end + + if result[:status] == :finished && result[:new_items].present? + self.class.perform_async(result[:new_items].first) + end + else + super(arg) + end + end + + private + + def sanitized_worker_name + self.class.name.underscore + end + + def feature_flag_name + "enable_deduplicater_for_#{sanitized_worker_name}" + end +end diff --git a/app/workers/pipeline_process_worker.rb b/app/workers/pipeline_process_worker.rb index 96524d93f8d..6caa744bb86 100644 --- a/app/workers/pipeline_process_worker.rb +++ b/app/workers/pipeline_process_worker.rb @@ -3,14 +3,15 @@ class PipelineProcessWorker include ApplicationWorker include PipelineQueue + prepend SidekiqJobDeduplicater queue_namespace :pipeline_processing - # rubocop: disable CodeReuse/ActiveRecord + self.deduplicater_default_enabled = false + def perform(pipeline_id, build_ids = nil) - Ci::Pipeline.find_by(id: pipeline_id).try do |pipeline| + Ci::Pipeline.find_by_id(pipeline_id).try do |pipeline| pipeline.process!(build_ids) end end - # rubocop: enable CodeReuse/ActiveRecord end diff --git a/changelogs/unreleased/de-duplicate-pipeline-process-sidekiq-jobs.yml b/changelogs/unreleased/de-duplicate-pipeline-process-sidekiq-jobs.yml new file mode 100644 index 00000000000..63a0512e8a7 --- /dev/null +++ b/changelogs/unreleased/de-duplicate-pipeline-process-sidekiq-jobs.yml @@ -0,0 +1,5 @@ +--- +title: De-duplicate pipeline process sidekiq jobs +merge_request: 31370 +author: +type: performance diff --git a/spec/workers/concerns/sidekiq_job_deduplicater_spec.rb b/spec/workers/concerns/sidekiq_job_deduplicater_spec.rb new file mode 100644 index 00000000000..15fff5330c9 --- /dev/null +++ b/spec/workers/concerns/sidekiq_job_deduplicater_spec.rb @@ -0,0 +1,65 @@ +# frozen_string_literal: true + +require 'spec_helper' + +describe SidekiqJobDeduplicater, :clean_gitlab_redis_shared_state do + let(:worker_class) { PipelineProcessWorker } + let(:worker) { worker_class.new } + let(:arg) { 1 } + + subject { worker.perform(arg) } + + it 'goes through Gitlab::BatchPopQueueing' do + queue = double(Gitlab::BatchPopQueueing) + + allow(Gitlab::BatchPopQueueing) + .to receive(:new).with('pipeline_process_worker', arg.to_s) { queue } + + expect(queue) + .to receive(:safe_execute).with([arg], lock_timeout: 10.minutes) { { status: :finished } } + + subject + end + + context 'when the result of BatchPopQueueing has new items' do + before do + allow_any_instance_of(Gitlab::BatchPopQueueing) + .to receive(:safe_execute) { { status: :finished, new_items: [1] } } + end + + it 'performs the additional item asynchronously' do + expect(worker_class).to receive(:perform_async).with(1) + + subject + end + end + + context 'when deduplicater_lock_timeout is 1 hour' do + before do + worker.deduplicater_lock_timeout = 1.hour + end + + it 'sets 1 hour to lock timeout' do + expect_next_instance_of(Gitlab::BatchPopQueueing) do |queue| + expect(queue).to receive(:safe_execute).with([arg], lock_timeout: 1.hour) + .and_call_original + end + + subject + end + end + + context 'when deduplicater_default_enabled is false' do + before do + worker.deduplicater_default_enabled = false + end + + it 'sets default_enabled false to the feature flag' do + expect(Feature) + .to receive(:enabled?).with('enable_deduplicater_for_pipeline_process_worker', + default_enabled: false) { false } + + subject + end + end +end diff --git a/spec/workers/pipeline_process_worker_spec.rb b/spec/workers/pipeline_process_worker_spec.rb index ac677e3b555..aaa37b7f464 100644 --- a/spec/workers/pipeline_process_worker_spec.rb +++ b/spec/workers/pipeline_process_worker_spec.rb @@ -2,11 +2,16 @@ require 'spec_helper' -describe PipelineProcessWorker do +describe PipelineProcessWorker, :clean_gitlab_redis_shared_state do + include ExclusiveLeaseHelpers + describe '#perform' do - context 'when pipeline exists' do - let(:pipeline) { create(:ci_pipeline) } + subject { worker.perform(pipeline.id) } + + let(:pipeline) { create(:ci_pipeline) } + let(:worker) { described_class.new } + context 'when pipeline exists' do it 'processes pipeline' do expect_any_instance_of(Ci::Pipeline).to receive(:process!) @@ -21,6 +26,35 @@ describe PipelineProcessWorker do .with([build.id]) described_class.new.perform(pipeline.id, [build.id]) + end + + context 'when the other sidekiq job has already been processing on the pipeline' do + before do + stub_exclusive_lease_taken("batch_pop_queueing:lock:pipeline_process_worker:#{pipeline.id}") + end + + it 'enqueues the pipeline id to the queue and does not process' do + expect_next_instance_of(Gitlab::BatchPopQueueing) do |queue| + expect(queue).to receive(:enqueue).with([pipeline.id], anything) + end + + expect_any_instance_of(Ci::Pipeline).not_to receive(:process!) + + subject + end + end + + context 'when there are some items are enqueued during the current process' do + before do + allow_any_instance_of(Gitlab::BatchPopQueueing).to receive(:safe_execute) do + { status: :finished, new_items: [pipeline.id] } + end + end + + it 're-executes PipelineProcessWorker asynchronously' do + expect(PipelineProcessWorker).to receive(:perform_async).with(pipeline.id) + + subject end end end @@ -31,5 +65,17 @@ describe PipelineProcessWorker do .not_to raise_error end end + + context 'when pipeline_process_worker_efficient_perform feature flag is disabled' do + before do + stub_feature_flags(enable_deduplicater_for_pipeline_process_worker: false) + end + + it 'processes without SidekiqJobDeduplicater' do + expect(Gitlab::BatchPopQueueing).not_to receive(:new) + + subject + end + end end end diff --git a/spec/workers/stuck_ci_jobs_worker_spec.rb b/spec/workers/stuck_ci_jobs_worker_spec.rb index c3d577e2dae..53320cdf761 100644 --- a/spec/workers/stuck_ci_jobs_worker_spec.rb +++ b/spec/workers/stuck_ci_jobs_worker_spec.rb @@ -13,6 +13,7 @@ describe StuckCiJobsWorker do subject(:worker) { described_class.new } before do + allow(PipelineProcessWorker).to receive(:perform_async) {} stub_exclusive_lease(worker_lease_key, worker_lease_uuid) job.update!(status: status, updated_at: updated_at) end |