1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
|
# frozen_string_literal: true
require 'digest'
require 'msgpack'
module Gitlab
module SidekiqMiddleware
module DuplicateJobs
# This class defines an identifier of a job in a queue
# The identifier based on a job's class and arguments.
#
# As strategy decides when to keep track of the job in redis and when to
# remove it.
#
# Storing the deduplication key in redis can be done by calling `check!`
# check returns the `jid` of the job if it was scheduled, or the `jid` of
# the duplicate job if it was already scheduled
#
# When new jobs can be scheduled again, the strategy calls `#delete`.
class DuplicateJob
include Gitlab::Utils::StrongMemoize
DEFAULT_DUPLICATE_KEY_TTL = 6.hours
DEFAULT_STRATEGY = :until_executing
STRATEGY_NONE = :none
attr_reader :existing_jid
def initialize(job, queue_name)
@job = job
@queue_name = queue_name
end
# This will continue the middleware chain if the job should be scheduled
# It will return false if the job needs to be cancelled
def schedule(&block)
Strategies.for(strategy).new(self).schedule(job, &block)
end
# This will continue the server middleware chain if the job should be
# executed.
# It will return false if the job should not be executed.
def perform(&block)
Strategies.for(strategy).new(self).perform(job, &block)
end
# This method will return the jid that was set in redis
def check!(expiry = duplicate_key_ttl)
my_cookie = {
'jid' => jid,
'offsets' => {},
'wal_locations' => {},
'existing_wal_locations' => job_wal_locations
}
# There are 3 possible scenarios. In order of decreasing likelyhood:
# 1. SET NX succeeds.
# 2. SET NX fails, GET succeeds.
# 3. SET NX fails, the key expires and GET fails. In this case we must retry.
actual_cookie = {}
while actual_cookie.empty?
set_succeeded = with_redis { |r| r.set(cookie_key, my_cookie.to_msgpack, nx: true, ex: expiry) }
actual_cookie = set_succeeded ? my_cookie : get_cookie
end
job['idempotency_key'] = idempotency_key
self.existing_wal_locations = actual_cookie['existing_wal_locations']
self.existing_jid = actual_cookie['jid']
end
def update_latest_wal_location!
return unless job_wal_locations.present?
argv = []
job_wal_locations.each do |connection_name, location|
argv += [connection_name, pg_wal_lsn_diff(connection_name), location]
end
with_redis { |r| r.eval(UPDATE_WAL_COOKIE_SCRIPT, keys: [cookie_key], argv: argv) }
end
# Generally speaking, updating a Redis key by deserializing and
# serializing it on the Redis server is bad for performance. However in
# the case of DuplicateJobs we know that key updates are rare, and the
# most common operations are setting, getting and deleting the key. The
# aim of this design is to make the common operations as fast as
# possible.
UPDATE_WAL_COOKIE_SCRIPT = <<~LUA
local cookie_msgpack = redis.call("get", KEYS[1])
if not cookie_msgpack then
return
end
local cookie = cmsgpack.unpack(cookie_msgpack)
for i = 1, #ARGV, 3 do
local connection = ARGV[i]
local current_offset = cookie.offsets[connection]
local new_offset = tonumber(ARGV[i+1])
if not current_offset or current_offset < new_offset then
cookie.offsets[connection] = new_offset
cookie.wal_locations[connection] = ARGV[i+2]
end
end
redis.call("set", KEYS[1], cmsgpack.pack(cookie), "ex", redis.call("ttl", KEYS[1]))
LUA
def latest_wal_locations
return {} unless job_wal_locations.present?
strong_memoize(:latest_wal_locations) do
get_cookie.fetch('wal_locations', {})
end
end
def delete!
with_redis { |redis| redis.del(cookie_key) }
end
def reschedule
Gitlab::SidekiqLogging::DeduplicationLogger.instance.rescheduled_log(job)
worker_klass.perform_async(*arguments)
end
def scheduled?
scheduled_at.present?
end
def duplicate?
raise "Call `#check!` first to check for existing duplicates" unless existing_jid
jid != existing_jid
end
def set_deduplicated_flag!(expiry = duplicate_key_ttl)
return unless reschedulable?
with_redis { |redis| redis.eval(DEDUPLICATED_SCRIPT, keys: [cookie_key]) }
end
DEDUPLICATED_SCRIPT = <<~LUA
local cookie_msgpack = redis.call("get", KEYS[1])
if not cookie_msgpack then
return
end
local cookie = cmsgpack.unpack(cookie_msgpack)
cookie.deduplicated = "1"
redis.call("set", KEYS[1], cmsgpack.pack(cookie), "ex", redis.call("ttl", KEYS[1]))
LUA
def should_reschedule?
reschedulable? && get_cookie['deduplicated'].present?
end
def scheduled_at
job['at']
end
def options
return {} unless worker_klass
return {} unless worker_klass.respond_to?(:get_deduplication_options)
worker_klass.get_deduplication_options
end
def idempotent?
return false unless worker_klass
return false unless worker_klass.respond_to?(:idempotent?)
worker_klass.idempotent?
end
def duplicate_key_ttl
options[:ttl] || DEFAULT_DUPLICATE_KEY_TTL
end
private
attr_writer :existing_wal_locations
attr_reader :queue_name, :job
attr_writer :existing_jid
def worker_klass
@worker_klass ||= worker_class_name.to_s.safe_constantize
end
def job_wal_locations
job['wal_locations'] || {}
end
def pg_wal_lsn_diff(connection_name)
model = Gitlab::Database.database_base_models[connection_name.to_sym]
model.connection.load_balancer.wal_diff(
job_wal_locations[connection_name],
existing_wal_locations[connection_name]
)
end
def strategy
return DEFAULT_STRATEGY unless worker_klass
return DEFAULT_STRATEGY unless worker_klass.respond_to?(:idempotent?)
return STRATEGY_NONE unless worker_klass.deduplication_enabled?
worker_klass.get_deduplicate_strategy
end
def worker_class_name
job['class']
end
def arguments
job['args']
end
def jid
job['jid']
end
def cookie_key
"#{idempotency_key}:cookie:v2"
end
def get_cookie
with_redis { |redis| MessagePack.unpack(redis.get(cookie_key) || "\x80") }
end
def idempotency_key
@idempotency_key ||= job['idempotency_key'] || "#{namespace}:#{idempotency_hash}"
end
def idempotency_hash
Digest::SHA256.hexdigest(idempotency_string)
end
def namespace
"#{Gitlab::Redis::Queues::SIDEKIQ_NAMESPACE}:duplicate:#{queue_name}"
end
def idempotency_string
"#{worker_class_name}:#{Sidekiq.dump_json(arguments)}"
end
def existing_wal_locations
@existing_wal_locations ||= {}
end
def reschedulable?
!scheduled? && options[:if_deduplicated] == :reschedule_once
end
def with_redis(&block)
Sidekiq.redis(&block) # rubocop:disable Cop/SidekiqRedisCall
end
end
end
end
end
|