1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
|
# frozen_string_literal: true
module Gitlab
module Counters
class BufferedCounter
include Gitlab::ExclusiveLeaseHelpers
WORKER_DELAY = 10.minutes
WORKER_LOCK_TTL = 10.minutes
# Refresh keys are set to expire after a very long time,
# so that they do not occupy Redis memory indefinitely,
# if for any reason they are not deleted.
# In practice, a refresh is not expected to take longer than this TTL.
REFRESH_KEYS_TTL = 14.days
CLEANUP_BATCH_SIZE = 50
CLEANUP_INTERVAL_SECONDS = 0.1
# Limit size of bitmap key to 2^26-1 (~8MB)
MAX_BITMAP_OFFSET = 67108863
LUA_FLUSH_INCREMENT_SCRIPT = <<~LUA
local increment_key, flushed_key = KEYS[1], KEYS[2]
local increment_value = redis.call("get", increment_key) or 0
local flushed_value = redis.call("incrby", flushed_key, increment_value)
if flushed_value == 0 then
redis.call("del", increment_key, flushed_key)
else
redis.call("del", increment_key)
end
return flushed_value
LUA
def initialize(counter_record, attribute)
@counter_record = counter_record
@attribute = attribute
end
def get
redis_state do |redis|
redis.get(key).to_i
end
end
LUA_INCREMENT_WITH_DEDUPLICATION_SCRIPT = <<~LUA
local counter_key, refresh_key, refresh_indicator_key = KEYS[1], KEYS[2], KEYS[3]
local tracking_shard_key, opposing_tracking_shard_key, shards_key = KEYS[4], KEYS[5], KEYS[6]
local amount, tracking_offset = tonumber(ARGV[1]), tonumber(ARGV[2])
-- increment to the counter key when not refreshing
if redis.call("exists", refresh_indicator_key) == 0 then
return redis.call("incrby", counter_key, amount)
end
-- deduplicate and increment to the refresh counter key while refreshing
local found_duplicate = redis.call("getbit", tracking_shard_key, tracking_offset)
if found_duplicate == 1 then
return redis.call("get", refresh_key)
end
redis.call("setbit", tracking_shard_key, tracking_offset, 1)
redis.call("expire", tracking_shard_key, #{REFRESH_KEYS_TTL.seconds})
redis.call("sadd", shards_key, tracking_shard_key)
redis.call("expire", shards_key, #{REFRESH_KEYS_TTL.seconds})
local found_opposing_change = redis.call("getbit", opposing_tracking_shard_key, tracking_offset)
local increment_without_previous_decrement = amount > 0 and found_opposing_change == 0
local decrement_with_previous_increment = amount < 0 and found_opposing_change == 1
local net_change = 0
if increment_without_previous_decrement or decrement_with_previous_increment then
net_change = amount
end
return redis.call("incrby", refresh_key, net_change)
LUA
def increment(increment)
result = redis_state do |redis|
if Feature.enabled?(:project_statistics_bulk_increment, type: :development)
redis.eval(LUA_INCREMENT_WITH_DEDUPLICATION_SCRIPT, **increment_args(increment)).to_i
else
redis.incrby(key, increment.amount)
end
end
FlushCounterIncrementsWorker.perform_in(WORKER_DELAY, counter_record.class.name, counter_record.id, attribute)
result
end
def bulk_increment(increments)
result = redis_state do |redis|
redis.pipelined do |pipeline|
increments.each do |increment|
if Feature.enabled?(:project_statistics_bulk_increment, type: :development)
pipeline.eval(LUA_INCREMENT_WITH_DEDUPLICATION_SCRIPT, **increment_args(increment))
else
pipeline.incrby(key, increment.amount)
end
end
end
end
FlushCounterIncrementsWorker.perform_in(WORKER_DELAY, counter_record.class.name, counter_record.id, attribute)
result.last.to_i
end
LUA_INITIATE_REFRESH_SCRIPT = <<~LUA
local counter_key, refresh_indicator_key = KEYS[1], KEYS[2]
redis.call("del", counter_key)
redis.call("set", refresh_indicator_key, 1, "ex", #{REFRESH_KEYS_TTL.seconds})
LUA
def initiate_refresh!
counter_record.update!(attribute => 0)
redis_state do |redis|
redis.eval(LUA_INITIATE_REFRESH_SCRIPT, keys: [key, refresh_indicator_key])
end
end
LUA_FINALIZE_REFRESH_SCRIPT = <<~LUA
local counter_key, refresh_key, refresh_indicator_key = KEYS[1], KEYS[2], KEYS[3]
local refresh_amount = redis.call("get", refresh_key) or 0
redis.call("incrby", counter_key, refresh_amount)
redis.call("del", refresh_indicator_key, refresh_key)
LUA
def finalize_refresh
redis_state do |redis|
redis.eval(LUA_FINALIZE_REFRESH_SCRIPT, keys: [key, refresh_key, refresh_indicator_key])
end
FlushCounterIncrementsWorker.perform_in(WORKER_DELAY, counter_record.class.name, counter_record.id, attribute)
::Counters::CleanupRefreshWorker.perform_async(counter_record.class.name, counter_record.id, attribute)
end
def cleanup_refresh
redis_state do |redis|
while (shards = redis.spop(shards_key, CLEANUP_BATCH_SIZE))
redis.del(*shards)
break if shards.size < CLEANUP_BATCH_SIZE
sleep CLEANUP_INTERVAL_SECONDS
end
end
end
def commit_increment!
with_exclusive_lease do
flush_amount = amount_to_be_flushed
next if flush_amount == 0
counter_record.transaction do
counter_record.update_counters_with_lease({ attribute => flush_amount })
remove_flushed_key
end
counter_record.execute_after_commit_callbacks
end
counter_record.reset.read_attribute(attribute)
end
# amount_to_be_flushed returns the total value to be flushed.
# The total value is the sum of the following:
# - current value in the increment_key
# - any existing value in the flushed_key that has not been flushed
def amount_to_be_flushed
redis_state do |redis|
redis.eval(LUA_FLUSH_INCREMENT_SCRIPT, keys: [key, flushed_key])
end
end
def key
project_id = counter_record.project.id
record_name = counter_record.class
record_id = counter_record.id
"project:{#{project_id}}:counters:#{record_name}:#{record_id}:#{attribute}"
end
def flushed_key
"#{key}:flushed"
end
def refresh_indicator_key
"#{key}:refresh-in-progress"
end
def refresh_key
"#{key}:refresh"
end
private
attr_reader :counter_record, :attribute
def increment_args(increment)
{
keys: [
key,
refresh_key,
refresh_indicator_key,
tracking_shard_key(increment),
opposing_tracking_shard_key(increment),
shards_key
],
argv: [
increment.amount,
tracking_offset(increment)
]
}
end
def tracking_shard_key(increment)
positive?(increment) ? positive_shard_key(increment.ref.to_i) : negative_shard_key(increment.ref.to_i)
end
def opposing_tracking_shard_key(increment)
positive?(increment) ? negative_shard_key(increment.ref.to_i) : positive_shard_key(increment.ref.to_i)
end
def shards_key
"#{refresh_key}:shards"
end
def positive_shard_key(ref)
"#{refresh_key}:+:#{shard_number(ref)}"
end
def negative_shard_key(ref)
"#{refresh_key}:-:#{shard_number(ref)}"
end
def shard_number(ref)
ref / MAX_BITMAP_OFFSET
end
def tracking_offset(increment)
increment.ref.to_i % MAX_BITMAP_OFFSET
end
def positive?(increment)
increment.amount > 0
end
def remove_flushed_key
redis_state do |redis|
redis.del(flushed_key)
end
end
def redis_state(&block)
Gitlab::Redis::SharedState.with(&block)
end
def with_exclusive_lease(&block)
lock_key = "#{key}:locked"
in_lock(lock_key, ttl: WORKER_LOCK_TTL, &block)
rescue Gitlab::ExclusiveLeaseHelpers::FailedToObtainLockError
# a worker is already updating the counters
end
end
end
end
|