summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorShinya Maeda <shinya@gitlab.com>2019-08-01 15:50:05 +0700
committerShinya Maeda <shinya@gitlab.com>2019-08-20 14:25:54 +0700
commit5b1ff96f4d00ddc65a287d428c83308758622d80 (patch)
tree6d05cc742c7ed71eb065b468a9f3c0de3807b301
parentda9606512846aca61fb52f8afd6e9742426f8e3a (diff)
downloadgitlab-ce-de-duplicate-pipeline-process-sidekiq-jobs.tar.gz
De-duplicate pipeline process sidekiq jobsde-duplicate-pipeline-process-sidekiq-jobs
This commits deduplicates unnecessary pipeline process sidekiq jobs by using a batch pop queueing system. This way, the duplicate process will be processed later only once.
-rw-r--r--app/workers/concerns/sidekiq_job_deduplicater.rb73
-rw-r--r--app/workers/pipeline_process_worker.rb7
-rw-r--r--changelogs/unreleased/de-duplicate-pipeline-process-sidekiq-jobs.yml5
-rw-r--r--spec/workers/concerns/sidekiq_job_deduplicater_spec.rb65
-rw-r--r--spec/workers/pipeline_process_worker_spec.rb52
-rw-r--r--spec/workers/stuck_ci_jobs_worker_spec.rb1
6 files changed, 197 insertions, 6 deletions
diff --git a/app/workers/concerns/sidekiq_job_deduplicater.rb b/app/workers/concerns/sidekiq_job_deduplicater.rb
new file mode 100644
index 00000000000..aef0d674f20
--- /dev/null
+++ b/app/workers/concerns/sidekiq_job_deduplicater.rb
@@ -0,0 +1,73 @@
+# frozen_string_literal: true
+
+##
+# Deduplicate sidekiq jobs with BatchPopQueueing.
+#
+# Often a sidekiq worker performs on the exact same argument for multiple times.
+# This might make sense, however, it could be a waste of compute resource that
+# sidekiq jobs are basically idempotent and results will likely be the same. Also,
+# it could cause an unnecessary race condition that multiple jobs access the same
+# data asynchronously.
+#
+# We can resolve such concerns by `SidekiqJobDeduplicater`, which
+# processes a single item exclusively. When multiple sidekiq jobs are initiated
+# with **the same argument**, the first item is executed and the following items
+# are enqueued into a batch-pop-queue instead of an immediate execution.
+# In this enqueueing process, the system removes duplicate items proactively, thus
+# the next item is always one, becuase we enqueue the same item to the queue.
+# Once sidekiq job finished to work on the first item, it works on the
+# same item again, which ensures that all sidekiq jobs are processed after
+# the enqueueing.
+#
+# NOTE:
+# - Currently, this class is not compatible with multiple arguments in sidekiq classes.
+# - If the sidekiq worker is often killed by sidekiq memory killer due to the memory consumption,
+# the exclusive lock might linger in Redis and it could interrupt the next execution.
+module SidekiqJobDeduplicater
+ extend ActiveSupport::Concern
+ extend ::Gitlab::Utils::Override
+
+ prepended do
+ class_attribute :deduplicater_default_enabled
+ class_attribute :deduplicater_lock_timeout
+
+ # The deduplicater is behind a feature flag and you can disable the behavior
+ # by disabling the feature flag.
+ # The deduplicater is enabled by default, if you want to disable by default,
+ # set `false` to the `deduplicater_default_enabled` vaule.
+ self.deduplicater_default_enabled = true
+
+ # The deduplicater runs the process in an exclusive lock and while the lock
+ # is effective the duplicate sidekiq jobs will be absorbed or defered after
+ # the current process finishes.
+ # Basically, you should set `deduplicater_lock_timeout` a greater vaule than
+ # the maximum execution time of the sidekiq job.
+ self.deduplicater_lock_timeout = 10.minutes
+ end
+
+ override :perform
+ def perform(arg)
+ if Feature.enabled?(feature_flag_name, default_enabled: deduplicater_default_enabled)
+ result = Gitlab::BatchPopQueueing.new(sanitized_worker_name, arg.to_s)
+ .safe_execute([arg], lock_timeout: deduplicater_lock_timeout) do |items|
+ super(items.first)
+ end
+
+ if result[:status] == :finished && result[:new_items].present?
+ self.class.perform_async(result[:new_items].first)
+ end
+ else
+ super(arg)
+ end
+ end
+
+ private
+
+ def sanitized_worker_name
+ self.class.name.underscore
+ end
+
+ def feature_flag_name
+ "enable_deduplicater_for_#{sanitized_worker_name}"
+ end
+end
diff --git a/app/workers/pipeline_process_worker.rb b/app/workers/pipeline_process_worker.rb
index 96524d93f8d..6caa744bb86 100644
--- a/app/workers/pipeline_process_worker.rb
+++ b/app/workers/pipeline_process_worker.rb
@@ -3,14 +3,15 @@
class PipelineProcessWorker
include ApplicationWorker
include PipelineQueue
+ prepend SidekiqJobDeduplicater
queue_namespace :pipeline_processing
- # rubocop: disable CodeReuse/ActiveRecord
+ self.deduplicater_default_enabled = false
+
def perform(pipeline_id, build_ids = nil)
- Ci::Pipeline.find_by(id: pipeline_id).try do |pipeline|
+ Ci::Pipeline.find_by_id(pipeline_id).try do |pipeline|
pipeline.process!(build_ids)
end
end
- # rubocop: enable CodeReuse/ActiveRecord
end
diff --git a/changelogs/unreleased/de-duplicate-pipeline-process-sidekiq-jobs.yml b/changelogs/unreleased/de-duplicate-pipeline-process-sidekiq-jobs.yml
new file mode 100644
index 00000000000..63a0512e8a7
--- /dev/null
+++ b/changelogs/unreleased/de-duplicate-pipeline-process-sidekiq-jobs.yml
@@ -0,0 +1,5 @@
+---
+title: De-duplicate pipeline process sidekiq jobs
+merge_request: 31370
+author:
+type: performance
diff --git a/spec/workers/concerns/sidekiq_job_deduplicater_spec.rb b/spec/workers/concerns/sidekiq_job_deduplicater_spec.rb
new file mode 100644
index 00000000000..15fff5330c9
--- /dev/null
+++ b/spec/workers/concerns/sidekiq_job_deduplicater_spec.rb
@@ -0,0 +1,65 @@
+# frozen_string_literal: true
+
+require 'spec_helper'
+
+describe SidekiqJobDeduplicater, :clean_gitlab_redis_shared_state do
+ let(:worker_class) { PipelineProcessWorker }
+ let(:worker) { worker_class.new }
+ let(:arg) { 1 }
+
+ subject { worker.perform(arg) }
+
+ it 'goes through Gitlab::BatchPopQueueing' do
+ queue = double(Gitlab::BatchPopQueueing)
+
+ allow(Gitlab::BatchPopQueueing)
+ .to receive(:new).with('pipeline_process_worker', arg.to_s) { queue }
+
+ expect(queue)
+ .to receive(:safe_execute).with([arg], lock_timeout: 10.minutes) { { status: :finished } }
+
+ subject
+ end
+
+ context 'when the result of BatchPopQueueing has new items' do
+ before do
+ allow_any_instance_of(Gitlab::BatchPopQueueing)
+ .to receive(:safe_execute) { { status: :finished, new_items: [1] } }
+ end
+
+ it 'performs the additional item asynchronously' do
+ expect(worker_class).to receive(:perform_async).with(1)
+
+ subject
+ end
+ end
+
+ context 'when deduplicater_lock_timeout is 1 hour' do
+ before do
+ worker.deduplicater_lock_timeout = 1.hour
+ end
+
+ it 'sets 1 hour to lock timeout' do
+ expect_next_instance_of(Gitlab::BatchPopQueueing) do |queue|
+ expect(queue).to receive(:safe_execute).with([arg], lock_timeout: 1.hour)
+ .and_call_original
+ end
+
+ subject
+ end
+ end
+
+ context 'when deduplicater_default_enabled is false' do
+ before do
+ worker.deduplicater_default_enabled = false
+ end
+
+ it 'sets default_enabled false to the feature flag' do
+ expect(Feature)
+ .to receive(:enabled?).with('enable_deduplicater_for_pipeline_process_worker',
+ default_enabled: false) { false }
+
+ subject
+ end
+ end
+end
diff --git a/spec/workers/pipeline_process_worker_spec.rb b/spec/workers/pipeline_process_worker_spec.rb
index ac677e3b555..aaa37b7f464 100644
--- a/spec/workers/pipeline_process_worker_spec.rb
+++ b/spec/workers/pipeline_process_worker_spec.rb
@@ -2,11 +2,16 @@
require 'spec_helper'
-describe PipelineProcessWorker do
+describe PipelineProcessWorker, :clean_gitlab_redis_shared_state do
+ include ExclusiveLeaseHelpers
+
describe '#perform' do
- context 'when pipeline exists' do
- let(:pipeline) { create(:ci_pipeline) }
+ subject { worker.perform(pipeline.id) }
+
+ let(:pipeline) { create(:ci_pipeline) }
+ let(:worker) { described_class.new }
+ context 'when pipeline exists' do
it 'processes pipeline' do
expect_any_instance_of(Ci::Pipeline).to receive(:process!)
@@ -21,6 +26,35 @@ describe PipelineProcessWorker do
.with([build.id])
described_class.new.perform(pipeline.id, [build.id])
+ end
+
+ context 'when the other sidekiq job has already been processing on the pipeline' do
+ before do
+ stub_exclusive_lease_taken("batch_pop_queueing:lock:pipeline_process_worker:#{pipeline.id}")
+ end
+
+ it 'enqueues the pipeline id to the queue and does not process' do
+ expect_next_instance_of(Gitlab::BatchPopQueueing) do |queue|
+ expect(queue).to receive(:enqueue).with([pipeline.id], anything)
+ end
+
+ expect_any_instance_of(Ci::Pipeline).not_to receive(:process!)
+
+ subject
+ end
+ end
+
+ context 'when there are some items are enqueued during the current process' do
+ before do
+ allow_any_instance_of(Gitlab::BatchPopQueueing).to receive(:safe_execute) do
+ { status: :finished, new_items: [pipeline.id] }
+ end
+ end
+
+ it 're-executes PipelineProcessWorker asynchronously' do
+ expect(PipelineProcessWorker).to receive(:perform_async).with(pipeline.id)
+
+ subject
end
end
end
@@ -31,5 +65,17 @@ describe PipelineProcessWorker do
.not_to raise_error
end
end
+
+ context 'when pipeline_process_worker_efficient_perform feature flag is disabled' do
+ before do
+ stub_feature_flags(enable_deduplicater_for_pipeline_process_worker: false)
+ end
+
+ it 'processes without SidekiqJobDeduplicater' do
+ expect(Gitlab::BatchPopQueueing).not_to receive(:new)
+
+ subject
+ end
+ end
end
end
diff --git a/spec/workers/stuck_ci_jobs_worker_spec.rb b/spec/workers/stuck_ci_jobs_worker_spec.rb
index c3d577e2dae..53320cdf761 100644
--- a/spec/workers/stuck_ci_jobs_worker_spec.rb
+++ b/spec/workers/stuck_ci_jobs_worker_spec.rb
@@ -13,6 +13,7 @@ describe StuckCiJobsWorker do
subject(:worker) { described_class.new }
before do
+ allow(PipelineProcessWorker).to receive(:perform_async) {}
stub_exclusive_lease(worker_lease_key, worker_lease_uuid)
job.update!(status: status, updated_at: updated_at)
end