1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
|
# frozen_string_literal: true
# Add capabilities to increment a numeric model attribute efficiently by
# using Redis and flushing the increments asynchronously to the database
# after a period of time (10 minutes).
# When an attribute is incremented by a value, the increment is added
# to a Redis key. Then, FlushCounterIncrementsWorker will execute
# `flush_increments_to_database!` which removes increments from Redis for a
# given model attribute and updates the values in the database.
#
# @example:
#
# class ProjectStatistics
# include CounterAttribute
#
# counter_attribute :commit_count
# counter_attribute :storage_size
# end
#
# To increment the counter we can use the method:
# delayed_increment_counter(:commit_count, 3)
#
# It is possible to register callbacks to be executed after increments have
# been flushed to the database. Callbacks are not executed if there are no increments
# to flush.
#
# counter_attribute_after_flush do |statistic|
# Namespaces::ScheduleAggregationWorker.perform_async(statistic.namespace_id)
# end
#
module CounterAttribute
extend ActiveSupport::Concern
extend AfterCommitQueue
include Gitlab::ExclusiveLeaseHelpers
LUA_STEAL_INCREMENT_SCRIPT = <<~EOS
local increment_key, flushed_key = KEYS[1], KEYS[2]
local increment_value = redis.call("get", increment_key) or 0
local flushed_value = redis.call("incrby", flushed_key, increment_value)
if flushed_value == 0 then
redis.call("del", increment_key, flushed_key)
else
redis.call("del", increment_key)
end
return flushed_value
EOS
WORKER_DELAY = 10.minutes
WORKER_LOCK_TTL = 10.minutes
class_methods do
def counter_attribute(attribute)
counter_attributes << attribute
end
def counter_attributes
@counter_attributes ||= Set.new
end
def after_flush_callbacks
@after_flush_callbacks ||= []
end
# perform registered callbacks after increments have been flushed to the database
def counter_attribute_after_flush(&callback)
after_flush_callbacks << callback
end
def counter_attribute_enabled?(attribute)
counter_attributes.include?(attribute)
end
end
# This method must only be called by FlushCounterIncrementsWorker
# because it should run asynchronously and with exclusive lease.
# This will
# 1. temporarily move the pending increment for a given attribute
# to a relative "flushed" Redis key, delete the increment key and return
# the value. If new increments are performed at this point, the increment
# key is recreated as part of `delayed_increment_counter`.
# The "flushed" key is used to ensure that we can keep incrementing
# counters in Redis while flushing existing values.
# 2. then the value is used to update the counter in the database.
# 3. finally the "flushed" key is deleted.
def flush_increments_to_database!(attribute)
lock_key = counter_lock_key(attribute)
with_exclusive_lease(lock_key) do
previous_db_value = read_attribute(attribute)
increment_key = counter_key(attribute)
flushed_key = counter_flushed_key(attribute)
increment_value = steal_increments(increment_key, flushed_key)
new_db_value = nil
next if increment_value == 0
transaction do
update_counters_with_lease({ attribute => increment_value })
redis_state { |redis| redis.del(flushed_key) }
new_db_value = reset.read_attribute(attribute)
end
execute_after_flush_callbacks
log_flush_counter(attribute, increment_value, previous_db_value, new_db_value)
end
end
def delayed_increment_counter(attribute, increment)
raise ArgumentError, "#{attribute} is not a counter attribute" unless counter_attribute_enabled?(attribute)
return if increment == 0
run_after_commit_or_now do
increment_counter(attribute, increment)
FlushCounterIncrementsWorker.perform_in(WORKER_DELAY, self.class.name, self.id, attribute)
end
true
end
def increment_counter(attribute, increment)
if counter_attribute_enabled?(attribute)
new_value = redis_state do |redis|
redis.incrby(counter_key(attribute), increment)
end
log_increment_counter(attribute, increment, new_value)
end
end
def update_counters_with_lease(increments)
detect_race_on_record(log_fields: { caller: __method__, attributes: increments.keys }) do
self.class.update_counters(id, increments)
end
end
def reset_counter!(attribute)
if counter_attribute_enabled?(attribute)
detect_race_on_record(log_fields: { caller: __method__, attributes: attribute }) do
update!(attribute => 0)
clear_counter!(attribute)
end
log_clear_counter(attribute)
end
end
def get_counter_value(attribute)
if counter_attribute_enabled?(attribute)
redis_state do |redis|
redis.get(counter_key(attribute)).to_i
end
end
end
def counter_key(attribute)
"project:{#{project_id}}:counters:#{self.class}:#{id}:#{attribute}"
end
def counter_flushed_key(attribute)
counter_key(attribute) + ':flushed'
end
def counter_lock_key(attribute)
counter_key(attribute) + ':lock'
end
def counter_attribute_enabled?(attribute)
self.class.counter_attribute_enabled?(attribute)
end
private
def database_lock_key
"project:{#{project_id}}:#{self.class}:#{id}"
end
def steal_increments(increment_key, flushed_key)
redis_state do |redis|
redis.eval(LUA_STEAL_INCREMENT_SCRIPT, keys: [increment_key, flushed_key])
end
end
def clear_counter!(attribute)
redis_state do |redis|
redis.del(counter_key(attribute))
end
end
def execute_after_flush_callbacks
self.class.after_flush_callbacks.each do |callback|
callback.call(self)
end
end
def redis_state(&block)
Gitlab::Redis::SharedState.with(&block)
end
def with_exclusive_lease(lock_key)
in_lock(lock_key, ttl: WORKER_LOCK_TTL) do
yield
end
rescue Gitlab::ExclusiveLeaseHelpers::FailedToObtainLockError
# a worker is already updating the counters
end
# detect_race_on_record uses a lease to monitor access
# to the project statistics row. This is needed to detect
# concurrent attempts to increment columns, which could result in a
# race condition.
#
# As the purpose is to detect and warn concurrent attempts,
# it falls back to direct update on the row if it fails to obtain the lease.
#
# It does not guarantee that there will not be any concurrent updates.
def detect_race_on_record(log_fields: {})
return yield unless Feature.enabled?(:counter_attribute_db_lease_for_update, project)
# Ensure attributes is always an array before we log
log_fields[:attributes] = Array(log_fields[:attributes])
Gitlab::AppLogger.info(
message: 'Acquiring lease for project statistics update',
project_statistics_id: id,
project_id: project.id,
**log_fields,
**Gitlab::ApplicationContext.current
)
in_lock(database_lock_key, retries: 0) do
yield
end
rescue Gitlab::ExclusiveLeaseHelpers::FailedToObtainLockError
Gitlab::AppLogger.warn(
message: 'Concurrent project statistics update detected',
project_statistics_id: id,
project_id: project.id,
**log_fields,
**Gitlab::ApplicationContext.current
)
yield
end
def log_increment_counter(attribute, increment, new_value)
payload = Gitlab::ApplicationContext.current.merge(
message: 'Increment counter attribute',
attribute: attribute,
project_id: project_id,
increment: increment,
new_counter_value: new_value,
current_db_value: read_attribute(attribute)
)
Gitlab::AppLogger.info(payload)
end
def log_flush_counter(attribute, increment, previous_db_value, new_db_value)
payload = Gitlab::ApplicationContext.current.merge(
message: 'Flush counter attribute to database',
attribute: attribute,
project_id: project_id,
increment: increment,
previous_db_value: previous_db_value,
new_db_value: new_db_value
)
Gitlab::AppLogger.info(payload)
end
def log_clear_counter(attribute)
payload = Gitlab::ApplicationContext.current.merge(
message: 'Clear counter attribute',
attribute: attribute,
project_id: project_id
)
Gitlab::AppLogger.info(payload)
end
end
|