1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
|
# frozen_string_literal: true
module Gitlab
module SidekiqMiddleware
module SizeLimiter
# Handle a Sidekiq job payload limit based on current configuration.
# This validator pulls the configuration from application settings:
# - limiter_mode: the current mode of the size
# limiter. This must be either `track` or `compress`.
# - compression_threshold_bytes: the threshold before the input job
# payload is compressed.
# - limit_bytes: the size limit in bytes.
#
# In track mode, if a job payload limit exceeds the size limit, an
# event is sent to Sentry and the job is scheduled like normal.
#
# In compress mode, if a job payload limit exceeds the threshold, it is
# then compressed. If the compressed payload still exceeds the limit, the
# job is discarded, and a ExceedLimitError exception is raised.
class Validator
# Avoid limiting the size of jobs for `BackgroundMigrationWorker` classes.
# We can't read the configuration from `ApplicationSetting` for those jobs
# when migrating a path that modifies the `application_settings` table.
# Reading the application settings through `ApplicationSetting#current`
# causes a `SELECT` with a list of column names, but that list of column
# names might not match what the table currently looks like causing
# an error when scheduling background migrations.
#
# The worker classes aren't constants here, because that would force
# Application Settings to be loaded earlier causing failures loading
# the environment in rake tasks
EXEMPT_WORKER_NAMES = ["BackgroundMigrationWorker", "Database::BatchedBackgroundMigrationWorker"].to_set
JOB_STATUS_KEY = 'size_limiter'
class << self
def validate!(worker_class, job)
return if EXEMPT_WORKER_NAMES.include?(worker_class.to_s)
return if validated?(job)
new(worker_class, job).validate!
end
def validated?(job)
job.has_key?(JOB_STATUS_KEY)
end
end
DEFAULT_SIZE_LIMIT = 0
DEFAULT_COMPRESSION_THRESHOLD_BYTES = 100_000 # 100kb
MODES = [
TRACK_MODE = 'track',
COMPRESS_MODE = 'compress'
].freeze
attr_reader :mode, :size_limit, :compression_threshold
def initialize(
worker_class, job,
mode: Gitlab::CurrentSettings.sidekiq_job_limiter_mode,
compression_threshold: Gitlab::CurrentSettings.sidekiq_job_limiter_compression_threshold_bytes,
size_limit: Gitlab::CurrentSettings.sidekiq_job_limiter_limit_bytes
)
@worker_class = worker_class
@job = job
set_mode(mode)
set_compression_threshold(compression_threshold)
set_size_limit(size_limit)
end
def validate!
@job[JOB_STATUS_KEY] = 'validated'
job_args = compress_if_necessary(::Sidekiq.dump_json(@job['args']))
return if @size_limit == 0
return if job_args.bytesize <= @size_limit
return if allow_big_payload?
exception = exceed_limit_error(job_args)
if compress_mode?
@job.delete(JOB_STATUS_KEY)
raise exception
else
@job[JOB_STATUS_KEY] = 'tracked'
track(exception)
end
end
private
def set_mode(mode)
@mode = (mode || TRACK_MODE).to_s.strip
unless MODES.include?(@mode)
::Sidekiq.logger.warn "Invalid Sidekiq size limiter mode: #{@mode}. Fallback to #{TRACK_MODE} mode."
@mode = TRACK_MODE
end
end
def set_compression_threshold(compression_threshold)
@compression_threshold = (compression_threshold || DEFAULT_COMPRESSION_THRESHOLD_BYTES).to_i
if @compression_threshold <= 0
::Sidekiq.logger.warn "Invalid Sidekiq size limiter compression threshold: #{@compression_threshold}"
@compression_threshold = DEFAULT_COMPRESSION_THRESHOLD_BYTES
end
end
def set_size_limit(size_limit)
@size_limit = (size_limit || DEFAULT_SIZE_LIMIT).to_i
if @size_limit < 0
::Sidekiq.logger.warn "Invalid Sidekiq size limiter limit: #{@size_limit}"
@size_limit = DEFAULT_SIZE_LIMIT
end
end
def exceed_limit_error(job_args)
ExceedLimitError.new(@worker_class, job_args.bytesize, @size_limit).tap do |exception|
# This should belong to Gitlab::ErrorTracking. We'll remove this
# after this epic is done:
# https://gitlab.com/groups/gitlab-com/gl-infra/-/epics/396
exception.set_backtrace(backtrace)
end
end
def compress_if_necessary(job_args)
return job_args unless compress_mode?
return job_args if job_args.bytesize < @compression_threshold
# When a job was scheduled in the future, it runs through the middleware
# twice. Once on scheduling and once on queueing. No need to compress twice.
return job_args if ::Gitlab::SidekiqMiddleware::SizeLimiter::Compressor.compressed?(@job)
::Gitlab::SidekiqMiddleware::SizeLimiter::Compressor.compress(@job, job_args)
end
def allow_big_payload?
worker_class = @worker_class.to_s.safe_constantize
worker_class.respond_to?(:big_payload?) && worker_class.big_payload?
end
def compress_mode?
@mode == COMPRESS_MODE
end
def track(exception)
Gitlab::ErrorTracking.track_exception(exception)
end
def backtrace
Gitlab::BacktraceCleaner.clean_backtrace(caller)
end
end
end
end
end
|