summaryrefslogtreecommitdiff
path: root/app/models/concerns/counter_attribute.rb
blob: 4bfeba338d2f8fc681541fb18830438f15fa0d84 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
# frozen_string_literal: true

# Add capabilities to increment a numeric model attribute efficiently by
# using Redis and flushing the increments asynchronously to the database
# after a period of time (10 minutes).
# When an attribute is incremented by a value, the increment is added
# to a Redis key. Then, FlushCounterIncrementsWorker will execute
# `flush_increments_to_database!` which removes increments from Redis for a
# given model attribute and updates the values in the database.
#
# @example:
#
#   class ProjectStatistics
#     include CounterAttribute
#
#     counter_attribute :commit_count
#     counter_attribute :storage_size
#   end
#
# To increment the counter we can use the method:
#   delayed_increment_counter(:commit_count, 3)
#
# It is possible to register callbacks to be executed after increments have
# been flushed to the database. Callbacks are not executed if there are no increments
# to flush.
#
#  counter_attribute_after_flush do |statistic|
#    Namespaces::ScheduleAggregationWorker.perform_async(statistic.namespace_id)
#  end
#
module CounterAttribute
  extend ActiveSupport::Concern
  extend AfterCommitQueue
  include Gitlab::ExclusiveLeaseHelpers

  LUA_STEAL_INCREMENT_SCRIPT = <<~EOS
    local increment_key, flushed_key = KEYS[1], KEYS[2]
    local increment_value = redis.call("get", increment_key) or 0
    local flushed_value = redis.call("incrby", flushed_key, increment_value)
    if flushed_value == 0 then
      redis.call("del", increment_key, flushed_key)
    else
      redis.call("del", increment_key)
    end
    return flushed_value
  EOS

  WORKER_DELAY = 10.minutes
  WORKER_LOCK_TTL = 10.minutes

  class_methods do
    def counter_attribute(attribute)
      counter_attributes << attribute
    end

    def counter_attributes
      @counter_attributes ||= Set.new
    end

    def after_flush_callbacks
      @after_flush_callbacks ||= []
    end

    # perform registered callbacks after increments have been flushed to the database
    def counter_attribute_after_flush(&callback)
      after_flush_callbacks << callback
    end
  end

  # This method must only be called by FlushCounterIncrementsWorker
  # because it should run asynchronously and with exclusive lease.
  # This will
  #  1. temporarily move the pending increment for a given attribute
  #     to a relative "flushed" Redis key, delete the increment key and return
  #     the value. If new increments are performed at this point, the increment
  #     key is recreated as part of `delayed_increment_counter`.
  #     The "flushed" key is used to ensure that we can keep incrementing
  #     counters in Redis while flushing existing values.
  #  2. then the value is used to update the counter in the database.
  #  3. finally the "flushed" key is deleted.
  def flush_increments_to_database!(attribute)
    lock_key = counter_lock_key(attribute)

    with_exclusive_lease(lock_key) do
      increment_key = counter_key(attribute)
      flushed_key = counter_flushed_key(attribute)
      increment_value = steal_increments(increment_key, flushed_key)

      next if increment_value == 0

      transaction do
        unsafe_update_counters(id, attribute => increment_value)
        redis_state { |redis| redis.del(flushed_key) }
      end

      execute_after_flush_callbacks
    end
  end

  def delayed_increment_counter(attribute, increment)
    return if increment == 0

    run_after_commit_or_now do
      if counter_attribute_enabled?(attribute)
        redis_state do |redis|
          redis.incrby(counter_key(attribute), increment)
        end

        FlushCounterIncrementsWorker.perform_in(WORKER_DELAY, self.class.name, self.id, attribute)
      else
        legacy_increment!(attribute, increment)
      end
    end

    true
  end

  def counter_key(attribute)
    "project:{#{project_id}}:counters:#{self.class}:#{id}:#{attribute}"
  end

  def counter_flushed_key(attribute)
    counter_key(attribute) + ':flushed'
  end

  def counter_lock_key(attribute)
    counter_key(attribute) + ':lock'
  end

  def counter_attribute_enabled?(attribute)
    self.class.counter_attributes.include?(attribute)
  end

  private

  def steal_increments(increment_key, flushed_key)
    redis_state do |redis|
      redis.eval(LUA_STEAL_INCREMENT_SCRIPT, keys: [increment_key, flushed_key])
    end
  end

  def legacy_increment!(attribute, increment)
    increment!(attribute, increment)
  end

  def unsafe_update_counters(id, increments)
    self.class.update_counters(id, increments)
  end

  def execute_after_flush_callbacks
    self.class.after_flush_callbacks.each do |callback|
      callback.call(self)
    end
  end

  def redis_state(&block)
    Gitlab::Redis::SharedState.with(&block)
  end

  def with_exclusive_lease(lock_key)
    in_lock(lock_key, ttl: WORKER_LOCK_TTL) do
      yield
    end
  rescue Gitlab::ExclusiveLeaseHelpers::FailedToObtainLockError
    # a worker is already updating the counters
  end
end