summaryrefslogtreecommitdiff
path: root/lib/gitlab/sidekiq_middleware
diff options
context:
space:
mode:
Diffstat (limited to 'lib/gitlab/sidekiq_middleware')
-rw-r--r--lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb10
-rw-r--r--lib/gitlab/sidekiq_middleware/instrumentation_logger.rb19
-rw-r--r--lib/gitlab/sidekiq_middleware/server_metrics.rb13
-rw-r--r--lib/gitlab/sidekiq_middleware/size_limiter/compressor.rb52
-rw-r--r--lib/gitlab/sidekiq_middleware/size_limiter/server.rb18
-rw-r--r--lib/gitlab/sidekiq_middleware/size_limiter/validator.rb91
6 files changed, 151 insertions, 52 deletions
diff --git a/lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb b/lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb
index 79ac853ea0c..4cf540ce3b8 100644
--- a/lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb
+++ b/lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb
@@ -19,6 +19,7 @@ module Gitlab
class DuplicateJob
DUPLICATE_KEY_TTL = 6.hours
DEFAULT_STRATEGY = :until_executing
+ STRATEGY_NONE = :none
attr_reader :existing_jid
@@ -51,6 +52,8 @@ module Gitlab
end
end
+ job['idempotency_key'] = idempotency_key
+
self.existing_jid = read_jid.value
end
@@ -100,6 +103,7 @@ module Gitlab
def strategy
return DEFAULT_STRATEGY unless worker_klass
return DEFAULT_STRATEGY unless worker_klass.respond_to?(:idempotent?)
+ return STRATEGY_NONE unless worker_klass.deduplication_enabled?
worker_klass.get_deduplicate_strategy
end
@@ -117,7 +121,7 @@ module Gitlab
end
def idempotency_key
- @idempotency_key ||= "#{namespace}:#{idempotency_hash}"
+ @idempotency_key ||= job['idempotency_key'] || "#{namespace}:#{idempotency_hash}"
end
def idempotency_hash
@@ -129,6 +133,10 @@ module Gitlab
end
def idempotency_string
+ # TODO: dump the argument's JSON using `Sidekiq.dump_json` instead
+ # this should be done in the next release so all jobs are written
+ # with their idempotency key.
+ # see https://gitlab.com/gitlab-com/gl-infra/scalability/-/issues/1090
"#{worker_class_name}:#{arguments.join('-')}"
end
end
diff --git a/lib/gitlab/sidekiq_middleware/instrumentation_logger.rb b/lib/gitlab/sidekiq_middleware/instrumentation_logger.rb
index b542aa4fe4c..1f0c63c5fff 100644
--- a/lib/gitlab/sidekiq_middleware/instrumentation_logger.rb
+++ b/lib/gitlab/sidekiq_middleware/instrumentation_logger.rb
@@ -3,24 +3,6 @@
module Gitlab
module SidekiqMiddleware
class InstrumentationLogger
- def self.keys
- @keys ||= [
- :cpu_s,
- :gitaly_calls,
- :gitaly_duration_s,
- :rugged_calls,
- :rugged_duration_s,
- :elasticsearch_calls,
- :elasticsearch_duration_s,
- :elasticsearch_timed_out_count,
- *::Gitlab::Memory::Instrumentation::KEY_MAPPING.values,
- *::Gitlab::Instrumentation::Redis.known_payload_keys,
- *::Gitlab::Metrics::Subscribers::ActiveRecord.known_payload_keys,
- *::Gitlab::Metrics::Subscribers::ExternalHttp::KNOWN_PAYLOAD_KEYS,
- *::Gitlab::Metrics::Subscribers::RackAttack::PAYLOAD_KEYS
- ]
- end
-
def call(worker, job, queue)
::Gitlab::InstrumentationHelper.init_instrumentation_data
@@ -37,7 +19,6 @@ module Gitlab
# https://github.com/mperham/sidekiq/blob/53bd529a0c3f901879925b8390353129c465b1f2/lib/sidekiq/processor.rb#L115-L118
job[:instrumentation] = {}.tap do |instrumentation_values|
::Gitlab::InstrumentationHelper.add_instrumentation_data(instrumentation_values)
- instrumentation_values.slice!(*self.class.keys)
end
end
end
diff --git a/lib/gitlab/sidekiq_middleware/server_metrics.rb b/lib/gitlab/sidekiq_middleware/server_metrics.rb
index 474afffcf93..6d130957f36 100644
--- a/lib/gitlab/sidekiq_middleware/server_metrics.rb
+++ b/lib/gitlab/sidekiq_middleware/server_metrics.rb
@@ -13,6 +13,10 @@ module Gitlab
@metrics = init_metrics
@metrics[:sidekiq_concurrency].set({}, Sidekiq.options[:concurrency].to_i)
+
+ if ::Gitlab::Database::LoadBalancing.enable?
+ @metrics[:sidekiq_load_balancing_count] = ::Gitlab::Metrics.counter(:sidekiq_load_balancing_count, 'Sidekiq jobs with load balancing')
+ end
end
def call(worker, job, queue)
@@ -69,6 +73,15 @@ module Gitlab
@metrics[:sidekiq_redis_requests_duration_seconds].observe(labels, get_redis_time(instrumentation))
@metrics[:sidekiq_elasticsearch_requests_total].increment(labels, get_elasticsearch_calls(instrumentation))
@metrics[:sidekiq_elasticsearch_requests_duration_seconds].observe(labels, get_elasticsearch_time(instrumentation))
+
+ if ::Gitlab::Database::LoadBalancing.enable? && job[:database_chosen]
+ load_balancing_labels = {
+ database_chosen: job[:database_chosen],
+ data_consistency: job[:data_consistency]
+ }
+
+ @metrics[:sidekiq_load_balancing_count].increment(labels.merge(load_balancing_labels), 1)
+ end
end
end
diff --git a/lib/gitlab/sidekiq_middleware/size_limiter/compressor.rb b/lib/gitlab/sidekiq_middleware/size_limiter/compressor.rb
new file mode 100644
index 00000000000..bce295d8ba5
--- /dev/null
+++ b/lib/gitlab/sidekiq_middleware/size_limiter/compressor.rb
@@ -0,0 +1,52 @@
+# frozen_string_literal: true
+
+module Gitlab
+ module SidekiqMiddleware
+ module SizeLimiter
+ class Compressor
+ PayloadDecompressionConflictError = Class.new(StandardError)
+ PayloadDecompressionError = Class.new(StandardError)
+
+ # Level 5 is a good trade-off between space and time
+ # https://gitlab.com/gitlab-com/gl-infra/scalability/-/issues/1054#note_568129605
+ COMPRESS_LEVEL = 5
+ ORIGINAL_SIZE_KEY = 'original_job_size_bytes'
+ COMPRESSED_KEY = 'compressed'
+
+ def self.compressed?(job)
+ job&.has_key?(COMPRESSED_KEY)
+ end
+
+ def self.compress(job, job_args)
+ compressed_args = Base64.strict_encode64(Zlib::Deflate.deflate(job_args, COMPRESS_LEVEL))
+
+ job[COMPRESSED_KEY] = true
+ job[ORIGINAL_SIZE_KEY] = job_args.bytesize
+ job['args'] = [compressed_args]
+
+ compressed_args
+ end
+
+ def self.decompress(job)
+ return unless compressed?(job)
+
+ validate_args!(job)
+
+ job.except!(ORIGINAL_SIZE_KEY, COMPRESSED_KEY)
+ job['args'] = Sidekiq.load_json(Zlib::Inflate.inflate(Base64.strict_decode64(job['args'].first)))
+ rescue Zlib::Error
+ raise PayloadDecompressionError, 'Fail to decompress Sidekiq job payload'
+ end
+
+ def self.validate_args!(job)
+ if job['args'] && job['args'].length != 1
+ exception = PayloadDecompressionConflictError.new('Sidekiq argument list should include 1 argument.\
+ This means that there is another a middleware interfering with the job payload.\
+ That conflicts with the payload compressor')
+ ::Gitlab::ErrorTracking.track_and_raise_exception(exception)
+ end
+ end
+ end
+ end
+ end
+end
diff --git a/lib/gitlab/sidekiq_middleware/size_limiter/server.rb b/lib/gitlab/sidekiq_middleware/size_limiter/server.rb
new file mode 100644
index 00000000000..70b384c8f28
--- /dev/null
+++ b/lib/gitlab/sidekiq_middleware/size_limiter/server.rb
@@ -0,0 +1,18 @@
+# frozen_string_literal: true
+
+module Gitlab
+ module SidekiqMiddleware
+ module SizeLimiter
+ class Server
+ def call(worker, job, queue)
+ # This middleware should always decompress jobs regardless of the
+ # limiter mode or size limit. Otherwise, this could leave compressed
+ # payloads in queues that are then not able to be processed.
+ ::Gitlab::SidekiqMiddleware::SizeLimiter::Compressor.decompress(job)
+
+ yield
+ end
+ end
+ end
+ end
+end
diff --git a/lib/gitlab/sidekiq_middleware/size_limiter/validator.rb b/lib/gitlab/sidekiq_middleware/size_limiter/validator.rb
index 2c50c4a2157..d86f1609f14 100644
--- a/lib/gitlab/sidekiq_middleware/size_limiter/validator.rb
+++ b/lib/gitlab/sidekiq_middleware/size_limiter/validator.rb
@@ -3,76 +3,103 @@
module Gitlab
module SidekiqMiddleware
module SizeLimiter
- # Validate a Sidekiq job payload limit based on current configuration.
+ # Handle a Sidekiq job payload limit based on current configuration.
# This validator pulls the configuration from the environment variables:
- #
# - GITLAB_SIDEKIQ_SIZE_LIMITER_MODE: the current mode of the size
- # limiter. This must be either `track` or `raise`.
- #
+ # limiter. This must be either `track` or `compress`.
+ # - GITLAB_SIDEKIQ_SIZE_LIMITER_COMPRESSION_THRESHOLD_BYTES: the
+ # threshold before the input job payload is compressed.
# - GITLAB_SIDEKIQ_SIZE_LIMITER_LIMIT_BYTES: the size limit in bytes.
#
- # If the size of job payload after serialization exceeds the limit, an
- # error is tracked raised adhering to the mode.
+ # In track mode, if a job payload limit exceeds the size limit, an
+ # event is sent to Sentry and the job is scheduled like normal.
+ #
+ # In compress mode, if a job payload limit exceeds the threshold, it is
+ # then compressed. If the compressed payload still exceeds the limit, the
+ # job is discarded, and a ExceedLimitError exception is raised.
class Validator
def self.validate!(worker_class, job)
new(worker_class, job).validate!
end
DEFAULT_SIZE_LIMIT = 0
+ DEFAULT_COMPRESION_THRESHOLD_BYTES = 100_000 # 100kb
MODES = [
TRACK_MODE = 'track',
- RAISE_MODE = 'raise'
+ COMPRESS_MODE = 'compress'
].freeze
- attr_reader :mode, :size_limit
+ attr_reader :mode, :size_limit, :compression_threshold
def initialize(
worker_class, job,
mode: ENV['GITLAB_SIDEKIQ_SIZE_LIMITER_MODE'],
+ compression_threshold: ENV['GITLAB_SIDEKIQ_SIZE_LIMITER_COMPRESSION_THRESHOLD_BYTES'],
size_limit: ENV['GITLAB_SIDEKIQ_SIZE_LIMITER_LIMIT_BYTES']
)
@worker_class = worker_class
@job = job
+ set_mode(mode)
+ set_compression_threshold(compression_threshold)
+ set_size_limit(size_limit)
+ end
+
+ def validate!
+ return unless @size_limit > 0
+ return if allow_big_payload?
+
+ job_args = compress_if_necessary(::Sidekiq.dump_json(@job['args']))
+ return if job_args.bytesize <= @size_limit
+
+ exception = exceed_limit_error(job_args)
+ if compress_mode?
+ raise exception
+ else
+ track(exception)
+ end
+ end
+
+ private
+
+ def set_mode(mode)
@mode = (mode || TRACK_MODE).to_s.strip
unless MODES.include?(@mode)
::Sidekiq.logger.warn "Invalid Sidekiq size limiter mode: #{@mode}. Fallback to #{TRACK_MODE} mode."
@mode = TRACK_MODE
end
+ end
+
+ def set_compression_threshold(compression_threshold)
+ @compression_threshold = (compression_threshold || DEFAULT_COMPRESION_THRESHOLD_BYTES).to_i
+ if @compression_threshold <= 0
+ ::Sidekiq.logger.warn "Invalid Sidekiq size limiter compression threshold: #{@compression_threshold}"
+ @compression_threshold = DEFAULT_COMPRESION_THRESHOLD_BYTES
+ end
+ end
+ def set_size_limit(size_limit)
@size_limit = (size_limit || DEFAULT_SIZE_LIMIT).to_i
if @size_limit < 0
::Sidekiq.logger.warn "Invalid Sidekiq size limiter limit: #{@size_limit}"
end
end
- def validate!
- return unless @size_limit > 0
-
- return if allow_big_payload?
- return if job_size <= @size_limit
-
- exception = ExceedLimitError.new(@worker_class, job_size, @size_limit)
- # This should belong to Gitlab::ErrorTracking. We'll remove this
- # after this epic is done:
- # https://gitlab.com/groups/gitlab-com/gl-infra/-/epics/396
- exception.set_backtrace(backtrace)
-
- if raise_mode?
- raise exception
- else
- track(exception)
+ def exceed_limit_error(job_args)
+ ExceedLimitError.new(@worker_class, job_args.bytesize, @size_limit).tap do |exception|
+ # This should belong to Gitlab::ErrorTracking. We'll remove this
+ # after this epic is done:
+ # https://gitlab.com/groups/gitlab-com/gl-infra/-/epics/396
+ exception.set_backtrace(backtrace)
end
end
- private
+ def compress_if_necessary(job_args)
+ return job_args unless compress_mode?
+ return job_args if job_args.bytesize < @compression_threshold
- def job_size
- # This maynot be the optimal solution, but can be acceptable solution
- # for now. Internally, Sidekiq calls Sidekiq.dump_json everywhere.
- # There is no clean way to intefere to prevent double serialization.
- @job_size ||= ::Sidekiq.dump_json(@job).bytesize
+ ::Gitlab::SidekiqMiddleware::SizeLimiter::Compressor.compress(@job, job_args)
end
def allow_big_payload?
@@ -80,8 +107,8 @@ module Gitlab
worker_class.respond_to?(:big_payload?) && worker_class.big_payload?
end
- def raise_mode?
- @mode == RAISE_MODE
+ def compress_mode?
+ @mode == COMPRESS_MODE
end
def track(exception)