diff options
Diffstat (limited to 'app/models/concerns/counter_attribute.rb')
-rw-r--r-- | app/models/concerns/counter_attribute.rb | 143 |
1 files changed, 143 insertions, 0 deletions
diff --git a/app/models/concerns/counter_attribute.rb b/app/models/concerns/counter_attribute.rb new file mode 100644 index 00000000000..a5c7393e8f7 --- /dev/null +++ b/app/models/concerns/counter_attribute.rb @@ -0,0 +1,143 @@ +# frozen_string_literal: true + +# Add capabilities to increment a numeric model attribute efficiently by +# using Redis and flushing the increments asynchronously to the database +# after a period of time (10 minutes). +# When an attribute is incremented by a value, the increment is added +# to a Redis key. Then, FlushCounterIncrementsWorker will execute +# `flush_increments_to_database!` which removes increments from Redis for a +# given model attribute and updates the values in the database. +# +# @example: +# +# class ProjectStatistics +# include CounterAttribute +# +# counter_attribute :commit_count +# counter_attribute :storage_size +# end +# +# To increment the counter we can use the method: +# delayed_increment_counter(:commit_count, 3) +# +module CounterAttribute + extend ActiveSupport::Concern + extend AfterCommitQueue + include Gitlab::ExclusiveLeaseHelpers + + LUA_STEAL_INCREMENT_SCRIPT = <<~EOS.freeze + local increment_key, flushed_key = KEYS[1], KEYS[2] + local increment_value = redis.call("get", increment_key) or 0 + local flushed_value = redis.call("incrby", flushed_key, increment_value) + if flushed_value == 0 then + redis.call("del", increment_key, flushed_key) + else + redis.call("del", increment_key) + end + return flushed_value + EOS + + WORKER_DELAY = 10.minutes + WORKER_LOCK_TTL = 10.minutes + + class_methods do + def counter_attribute(attribute) + counter_attributes << attribute + end + + def counter_attributes + @counter_attributes ||= Set.new + end + end + + # This method must only be called by FlushCounterIncrementsWorker + # because it should run asynchronously and with exclusive lease. + # This will + # 1. temporarily move the pending increment for a given attribute + # to a relative "flushed" Redis key, delete the increment key and return + # the value. If new increments are performed at this point, the increment + # key is recreated as part of `delayed_increment_counter`. + # The "flushed" key is used to ensure that we can keep incrementing + # counters in Redis while flushing existing values. + # 2. then the value is used to update the counter in the database. + # 3. finally the "flushed" key is deleted. + def flush_increments_to_database!(attribute) + lock_key = counter_lock_key(attribute) + + with_exclusive_lease(lock_key) do + increment_key = counter_key(attribute) + flushed_key = counter_flushed_key(attribute) + increment_value = steal_increments(increment_key, flushed_key) + + next if increment_value == 0 + + transaction do + unsafe_update_counters(id, attribute => increment_value) + redis_state { |redis| redis.del(flushed_key) } + end + end + end + + def delayed_increment_counter(attribute, increment) + return if increment == 0 + + run_after_commit_or_now do + if counter_attribute_enabled?(attribute) + redis_state do |redis| + redis.incrby(counter_key(attribute), increment) + end + + FlushCounterIncrementsWorker.perform_in(WORKER_DELAY, self.class.name, self.id, attribute) + else + legacy_increment!(attribute, increment) + end + end + + true + end + + def counter_key(attribute) + "project:{#{project_id}}:counters:#{self.class}:#{id}:#{attribute}" + end + + def counter_flushed_key(attribute) + counter_key(attribute) + ':flushed' + end + + def counter_lock_key(attribute) + counter_key(attribute) + ':lock' + end + + private + + def counter_attribute_enabled?(attribute) + Feature.enabled?(:efficient_counter_attribute, project) && + self.class.counter_attributes.include?(attribute) + end + + def steal_increments(increment_key, flushed_key) + redis_state do |redis| + redis.eval(LUA_STEAL_INCREMENT_SCRIPT, keys: [increment_key, flushed_key]) + end + end + + def legacy_increment!(attribute, increment) + increment!(attribute, increment) + end + + def unsafe_update_counters(id, increments) + self.class.update_counters(id, increments) + end + + def redis_state(&block) + Gitlab::Redis::SharedState.with(&block) + end + + def with_exclusive_lease(lock_key) + in_lock(lock_key, ttl: WORKER_LOCK_TTL) do + yield + end + rescue Gitlab::ExclusiveLeaseHelpers::FailedToObtainLockError + # a worker is already updating the counters + end +end |