diff options
Diffstat (limited to 'lib/gitlab/counters')
-rw-r--r-- | lib/gitlab/counters/buffered_counter.rb | 166 | ||||
-rw-r--r-- | lib/gitlab/counters/legacy_counter.rb | 23 |
2 files changed, 180 insertions, 9 deletions
diff --git a/lib/gitlab/counters/buffered_counter.rb b/lib/gitlab/counters/buffered_counter.rb index 56593b642a9..3e232c78e45 100644 --- a/lib/gitlab/counters/buffered_counter.rb +++ b/lib/gitlab/counters/buffered_counter.rb @@ -8,6 +8,17 @@ module Gitlab WORKER_DELAY = 10.minutes WORKER_LOCK_TTL = 10.minutes + # Refresh keys are set to expire after a very long time, + # so that they do not occupy Redis memory indefinitely, + # if for any reason they are not deleted. + # In practice, a refresh is not expected to take longer than this TTL. + REFRESH_KEYS_TTL = 14.days + CLEANUP_BATCH_SIZE = 50 + CLEANUP_INTERVAL_SECONDS = 0.1 + + # Limit size of bitmap key to 2^26-1 (~8MB) + MAX_BITMAP_OFFSET = 67108863 + LUA_FLUSH_INCREMENT_SCRIPT = <<~LUA local increment_key, flushed_key = KEYS[1], KEYS[2] local increment_value = redis.call("get", increment_key) or 0 @@ -31,9 +42,47 @@ module Gitlab end end - def increment(amount) + LUA_INCREMENT_WITH_DEDUPLICATION_SCRIPT = <<~LUA + local counter_key, refresh_key, refresh_indicator_key = KEYS[1], KEYS[2], KEYS[3] + local tracking_shard_key, opposing_tracking_shard_key, shards_key = KEYS[4], KEYS[5], KEYS[6] + + local amount, tracking_offset = tonumber(ARGV[1]), tonumber(ARGV[2]) + + -- increment to the counter key when not refreshing + if redis.call("exists", refresh_indicator_key) == 0 then + return redis.call("incrby", counter_key, amount) + end + + -- deduplicate and increment to the refresh counter key while refreshing + local found_duplicate = redis.call("getbit", tracking_shard_key, tracking_offset) + if found_duplicate == 1 then + return redis.call("get", refresh_key) + end + + redis.call("setbit", tracking_shard_key, tracking_offset, 1) + redis.call("expire", tracking_shard_key, #{REFRESH_KEYS_TTL.seconds}) + redis.call("sadd", shards_key, tracking_shard_key) + redis.call("expire", shards_key, #{REFRESH_KEYS_TTL.seconds}) + + local found_opposing_change = redis.call("getbit", opposing_tracking_shard_key, tracking_offset) + local increment_without_previous_decrement = amount > 0 and found_opposing_change == 0 + local decrement_with_previous_increment = amount < 0 and found_opposing_change == 1 + local net_change = 0 + + if increment_without_previous_decrement or decrement_with_previous_increment then + net_change = amount + end + + return redis.call("incrby", refresh_key, net_change) + LUA + + def increment(increment) result = redis_state do |redis| - redis.incrby(key, amount) + if Feature.enabled?(:project_statistics_bulk_increment, type: :development) + redis.eval(LUA_INCREMENT_WITH_DEDUPLICATION_SCRIPT, **increment_args(increment)).to_i + else + redis.incrby(key, increment.amount) + end end FlushCounterIncrementsWorker.perform_in(WORKER_DELAY, counter_record.class.name, counter_record.id, attribute) @@ -41,11 +90,63 @@ module Gitlab result end - def reset! + def bulk_increment(increments) + result = redis_state do |redis| + redis.pipelined do |pipeline| + increments.each do |increment| + if Feature.enabled?(:project_statistics_bulk_increment, type: :development) + pipeline.eval(LUA_INCREMENT_WITH_DEDUPLICATION_SCRIPT, **increment_args(increment)) + else + pipeline.incrby(key, increment.amount) + end + end + end + end + + FlushCounterIncrementsWorker.perform_in(WORKER_DELAY, counter_record.class.name, counter_record.id, attribute) + + result.last.to_i + end + + LUA_INITIATE_REFRESH_SCRIPT = <<~LUA + local counter_key, refresh_indicator_key = KEYS[1], KEYS[2] + redis.call("del", counter_key) + redis.call("set", refresh_indicator_key, 1, "ex", #{REFRESH_KEYS_TTL.seconds}) + LUA + + def initiate_refresh! counter_record.update!(attribute => 0) redis_state do |redis| - redis.del(key) + redis.eval(LUA_INITIATE_REFRESH_SCRIPT, keys: [key, refresh_indicator_key]) + end + end + + LUA_FINALIZE_REFRESH_SCRIPT = <<~LUA + local counter_key, refresh_key, refresh_indicator_key = KEYS[1], KEYS[2], KEYS[3] + local refresh_amount = redis.call("get", refresh_key) or 0 + + redis.call("incrby", counter_key, refresh_amount) + redis.call("del", refresh_indicator_key, refresh_key) + LUA + + def finalize_refresh + redis_state do |redis| + redis.eval(LUA_FINALIZE_REFRESH_SCRIPT, keys: [key, refresh_key, refresh_indicator_key]) + end + + FlushCounterIncrementsWorker.perform_in(WORKER_DELAY, counter_record.class.name, counter_record.id, attribute) + ::Counters::CleanupRefreshWorker.perform_async(counter_record.class.name, counter_record.id, attribute) + end + + def cleanup_refresh + redis_state do |redis| + while (shards = redis.spop(shards_key, CLEANUP_BATCH_SIZE)) + redis.del(*shards) + break if shards.size < CLEANUP_BATCH_SIZE + + sleep CLEANUP_INTERVAL_SECONDS + end end end @@ -87,10 +188,67 @@ module Gitlab "#{key}:flushed" end + def refresh_indicator_key + "#{key}:refresh-in-progress" + end + + def refresh_key + "#{key}:refresh" + end + private attr_reader :counter_record, :attribute + def increment_args(increment) + { + keys: [ + key, + refresh_key, + refresh_indicator_key, + tracking_shard_key(increment), + opposing_tracking_shard_key(increment), + shards_key + ], + argv: [ + increment.amount, + tracking_offset(increment) + ] + } + end + + def tracking_shard_key(increment) + positive?(increment) ? positive_shard_key(increment.ref.to_i) : negative_shard_key(increment.ref.to_i) + end + + def opposing_tracking_shard_key(increment) + positive?(increment) ? negative_shard_key(increment.ref.to_i) : positive_shard_key(increment.ref.to_i) + end + + def shards_key + "#{refresh_key}:shards" + end + + def positive_shard_key(ref) + "#{refresh_key}:+:#{shard_number(ref)}" + end + + def negative_shard_key(ref) + "#{refresh_key}:-:#{shard_number(ref)}" + end + + def shard_number(ref) + ref / MAX_BITMAP_OFFSET + end + + def tracking_offset(increment) + increment.ref.to_i % MAX_BITMAP_OFFSET + end + + def positive?(increment) + increment.amount > 0 + end + def remove_flushed_key redis_state do |redis| redis.del(flushed_key) diff --git a/lib/gitlab/counters/legacy_counter.rb b/lib/gitlab/counters/legacy_counter.rb index 06951514ec3..823f9955168 100644 --- a/lib/gitlab/counters/legacy_counter.rb +++ b/lib/gitlab/counters/legacy_counter.rb @@ -11,23 +11,36 @@ module Gitlab @current_value = counter_record.method(attribute).call end - def increment(amount) - updated = counter_record.class.update_counters(counter_record.id, { attribute => amount }) + def increment(increment) + updated = update_counter_record_attribute(increment.amount) if updated == 1 counter_record.execute_after_commit_callbacks - @current_value += amount + @current_value += increment.amount end @current_value end - def reset! - counter_record.update!(attribute => 0) + def bulk_increment(increments) + total_increment = increments.sum(&:amount) + + updated = update_counter_record_attribute(total_increment) + + if updated == 1 + counter_record.execute_after_commit_callbacks + @current_value += total_increment + end + + @current_value end private + def update_counter_record_attribute(amount) + counter_record.class.update_counters(counter_record.id, { attribute => amount }) + end + attr_reader :counter_record, :attribute end end |