summaryrefslogtreecommitdiff
path: root/lib/gitlab/counters
diff options
context:
space:
mode:
Diffstat (limited to 'lib/gitlab/counters')
-rw-r--r--lib/gitlab/counters/buffered_counter.rb166
-rw-r--r--lib/gitlab/counters/legacy_counter.rb23
2 files changed, 180 insertions, 9 deletions
diff --git a/lib/gitlab/counters/buffered_counter.rb b/lib/gitlab/counters/buffered_counter.rb
index 56593b642a9..3e232c78e45 100644
--- a/lib/gitlab/counters/buffered_counter.rb
+++ b/lib/gitlab/counters/buffered_counter.rb
@@ -8,6 +8,17 @@ module Gitlab
WORKER_DELAY = 10.minutes
WORKER_LOCK_TTL = 10.minutes
+ # Refresh keys are set to expire after a very long time,
+ # so that they do not occupy Redis memory indefinitely,
+ # if for any reason they are not deleted.
+ # In practice, a refresh is not expected to take longer than this TTL.
+ REFRESH_KEYS_TTL = 14.days
+ CLEANUP_BATCH_SIZE = 50
+ CLEANUP_INTERVAL_SECONDS = 0.1
+
+ # Limit size of bitmap key to 2^26-1 (~8MB)
+ MAX_BITMAP_OFFSET = 67108863
+
LUA_FLUSH_INCREMENT_SCRIPT = <<~LUA
local increment_key, flushed_key = KEYS[1], KEYS[2]
local increment_value = redis.call("get", increment_key) or 0
@@ -31,9 +42,47 @@ module Gitlab
end
end
- def increment(amount)
+ LUA_INCREMENT_WITH_DEDUPLICATION_SCRIPT = <<~LUA
+ local counter_key, refresh_key, refresh_indicator_key = KEYS[1], KEYS[2], KEYS[3]
+ local tracking_shard_key, opposing_tracking_shard_key, shards_key = KEYS[4], KEYS[5], KEYS[6]
+
+ local amount, tracking_offset = tonumber(ARGV[1]), tonumber(ARGV[2])
+
+ -- increment to the counter key when not refreshing
+ if redis.call("exists", refresh_indicator_key) == 0 then
+ return redis.call("incrby", counter_key, amount)
+ end
+
+ -- deduplicate and increment to the refresh counter key while refreshing
+ local found_duplicate = redis.call("getbit", tracking_shard_key, tracking_offset)
+ if found_duplicate == 1 then
+ return redis.call("get", refresh_key)
+ end
+
+ redis.call("setbit", tracking_shard_key, tracking_offset, 1)
+ redis.call("expire", tracking_shard_key, #{REFRESH_KEYS_TTL.seconds})
+ redis.call("sadd", shards_key, tracking_shard_key)
+ redis.call("expire", shards_key, #{REFRESH_KEYS_TTL.seconds})
+
+ local found_opposing_change = redis.call("getbit", opposing_tracking_shard_key, tracking_offset)
+ local increment_without_previous_decrement = amount > 0 and found_opposing_change == 0
+ local decrement_with_previous_increment = amount < 0 and found_opposing_change == 1
+ local net_change = 0
+
+ if increment_without_previous_decrement or decrement_with_previous_increment then
+ net_change = amount
+ end
+
+ return redis.call("incrby", refresh_key, net_change)
+ LUA
+
+ def increment(increment)
result = redis_state do |redis|
- redis.incrby(key, amount)
+ if Feature.enabled?(:project_statistics_bulk_increment, type: :development)
+ redis.eval(LUA_INCREMENT_WITH_DEDUPLICATION_SCRIPT, **increment_args(increment)).to_i
+ else
+ redis.incrby(key, increment.amount)
+ end
end
FlushCounterIncrementsWorker.perform_in(WORKER_DELAY, counter_record.class.name, counter_record.id, attribute)
@@ -41,11 +90,63 @@ module Gitlab
result
end
- def reset!
+ def bulk_increment(increments)
+ result = redis_state do |redis|
+ redis.pipelined do |pipeline|
+ increments.each do |increment|
+ if Feature.enabled?(:project_statistics_bulk_increment, type: :development)
+ pipeline.eval(LUA_INCREMENT_WITH_DEDUPLICATION_SCRIPT, **increment_args(increment))
+ else
+ pipeline.incrby(key, increment.amount)
+ end
+ end
+ end
+ end
+
+ FlushCounterIncrementsWorker.perform_in(WORKER_DELAY, counter_record.class.name, counter_record.id, attribute)
+
+ result.last.to_i
+ end
+
+ LUA_INITIATE_REFRESH_SCRIPT = <<~LUA
+ local counter_key, refresh_indicator_key = KEYS[1], KEYS[2]
+ redis.call("del", counter_key)
+ redis.call("set", refresh_indicator_key, 1, "ex", #{REFRESH_KEYS_TTL.seconds})
+ LUA
+
+ def initiate_refresh!
counter_record.update!(attribute => 0)
redis_state do |redis|
- redis.del(key)
+ redis.eval(LUA_INITIATE_REFRESH_SCRIPT, keys: [key, refresh_indicator_key])
+ end
+ end
+
+ LUA_FINALIZE_REFRESH_SCRIPT = <<~LUA
+ local counter_key, refresh_key, refresh_indicator_key = KEYS[1], KEYS[2], KEYS[3]
+ local refresh_amount = redis.call("get", refresh_key) or 0
+
+ redis.call("incrby", counter_key, refresh_amount)
+ redis.call("del", refresh_indicator_key, refresh_key)
+ LUA
+
+ def finalize_refresh
+ redis_state do |redis|
+ redis.eval(LUA_FINALIZE_REFRESH_SCRIPT, keys: [key, refresh_key, refresh_indicator_key])
+ end
+
+ FlushCounterIncrementsWorker.perform_in(WORKER_DELAY, counter_record.class.name, counter_record.id, attribute)
+ ::Counters::CleanupRefreshWorker.perform_async(counter_record.class.name, counter_record.id, attribute)
+ end
+
+ def cleanup_refresh
+ redis_state do |redis|
+ while (shards = redis.spop(shards_key, CLEANUP_BATCH_SIZE))
+ redis.del(*shards)
+ break if shards.size < CLEANUP_BATCH_SIZE
+
+ sleep CLEANUP_INTERVAL_SECONDS
+ end
end
end
@@ -87,10 +188,67 @@ module Gitlab
"#{key}:flushed"
end
+ def refresh_indicator_key
+ "#{key}:refresh-in-progress"
+ end
+
+ def refresh_key
+ "#{key}:refresh"
+ end
+
private
attr_reader :counter_record, :attribute
+ def increment_args(increment)
+ {
+ keys: [
+ key,
+ refresh_key,
+ refresh_indicator_key,
+ tracking_shard_key(increment),
+ opposing_tracking_shard_key(increment),
+ shards_key
+ ],
+ argv: [
+ increment.amount,
+ tracking_offset(increment)
+ ]
+ }
+ end
+
+ def tracking_shard_key(increment)
+ positive?(increment) ? positive_shard_key(increment.ref.to_i) : negative_shard_key(increment.ref.to_i)
+ end
+
+ def opposing_tracking_shard_key(increment)
+ positive?(increment) ? negative_shard_key(increment.ref.to_i) : positive_shard_key(increment.ref.to_i)
+ end
+
+ def shards_key
+ "#{refresh_key}:shards"
+ end
+
+ def positive_shard_key(ref)
+ "#{refresh_key}:+:#{shard_number(ref)}"
+ end
+
+ def negative_shard_key(ref)
+ "#{refresh_key}:-:#{shard_number(ref)}"
+ end
+
+ def shard_number(ref)
+ ref / MAX_BITMAP_OFFSET
+ end
+
+ def tracking_offset(increment)
+ increment.ref.to_i % MAX_BITMAP_OFFSET
+ end
+
+ def positive?(increment)
+ increment.amount > 0
+ end
+
def remove_flushed_key
redis_state do |redis|
redis.del(flushed_key)
diff --git a/lib/gitlab/counters/legacy_counter.rb b/lib/gitlab/counters/legacy_counter.rb
index 06951514ec3..823f9955168 100644
--- a/lib/gitlab/counters/legacy_counter.rb
+++ b/lib/gitlab/counters/legacy_counter.rb
@@ -11,23 +11,36 @@ module Gitlab
@current_value = counter_record.method(attribute).call
end
- def increment(amount)
- updated = counter_record.class.update_counters(counter_record.id, { attribute => amount })
+ def increment(increment)
+ updated = update_counter_record_attribute(increment.amount)
if updated == 1
counter_record.execute_after_commit_callbacks
- @current_value += amount
+ @current_value += increment.amount
end
@current_value
end
- def reset!
- counter_record.update!(attribute => 0)
+ def bulk_increment(increments)
+ total_increment = increments.sum(&:amount)
+
+ updated = update_counter_record_attribute(total_increment)
+
+ if updated == 1
+ counter_record.execute_after_commit_callbacks
+ @current_value += total_increment
+ end
+
+ @current_value
end
private
+ def update_counter_record_attribute(amount)
+ counter_record.class.update_counters(counter_record.id, { attribute => amount })
+ end
+
attr_reader :counter_record, :attribute
end
end