summaryrefslogtreecommitdiff
path: root/lib/gitlab/database/postgres_hll/batch_distinct_counter.rb
blob: 33faa2ef1b00d634411bcde61175df4af21cee7b (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
# frozen_string_literal: true

module Gitlab
  module Database
    module PostgresHll
      # For large tables, PostgreSQL can take a long time to count rows due to MVCC.
      # Implements a distinct batch counter based on HyperLogLog algorithm
      # Needs indexes on the column below to calculate max, min and range queries
      # For larger tables just set higher batch_size with index optimization
      #
      # In order to not use a possible complex time consuming query when calculating min and max values,
      # the start and finish can be sent specifically, start and finish should contain max and min values for PRIMARY KEY of
      # relation (most cases `id` column) rather than counted attribute eg:
      # estimate_distinct_count(start: ::Project.with_active_services.minimum(:id), finish: ::Project.with_active_services.maximum(:id))
      #
      # Grouped relations are NOT supported yet.
      #
      # @example Usage
      #  ::Gitlab::Database::PostgresHllBatchDistinctCount.new(::Project, :creator_id).estimate_distinct_count
      #  ::Gitlab::Database::PostgresHllBatchDistinctCount.new(::Project.with_active_services.service_desk_enabled.where(time_period))
      #    .estimate_distinct_count(
      #      batch_size: 1_000,
      #      start: ::Project.with_active_services.service_desk_enabled.where(time_period).minimum(:id),
      #      finish: ::Project.with_active_services.service_desk_enabled.where(time_period).maximum(:id)
      #    )
      #
      # @note HyperLogLog is an PROBABILISTIC algorithm that ESTIMATES distinct count of given attribute value for supplied relation
      #  Like all probabilistic algorithm is has ERROR RATE margin, that can affect values,
      #  for given implementation no higher value was reported (https://gitlab.com/gitlab-org/gitlab/-/merge_requests/45673#accuracy-estimation) than 5.3%
      #  for the most of a cases this value is lower. However, if the exact value is necessary other tools has to be used.
      class BatchDistinctCounter
        ERROR_RATE = 4.9 # max encountered empirical error rate, used in tests
        FALLBACK = -1
        MIN_REQUIRED_BATCH_SIZE = 750
        SLEEP_TIME_IN_SECONDS = 0.01 # 10 msec sleep
        MAX_DATA_VOLUME = 4_000_000_000

        # Each query should take < 500ms https://gitlab.com/gitlab-org/gitlab/-/merge_requests/22705
        DEFAULT_BATCH_SIZE = 10_000

        BIT_31_MASK = "B'0#{'1' * 31}'"
        BIT_9_MASK = "B'#{'0' * 23}#{'1' * 9}'"
        # @example source_query
        #   SELECT CAST(('X' || md5(CAST(%{column} as text))) as bit(32)) attr_hash_32_bits
        #   FROM %{relation}
        #   WHERE %{pkey} >= %{batch_start}
        #   AND %{pkey} < %{batch_end}
        #   AND %{column} IS NOT NULL
        BUCKETED_DATA_SQL = <<~SQL
          WITH hashed_attributes AS (%{source_query})
          SELECT (attr_hash_32_bits & #{BIT_9_MASK})::int AS bucket_num,
            (31 - floor(log(2, min((attr_hash_32_bits & #{BIT_31_MASK})::int))))::int as bucket_hash
          FROM hashed_attributes
          GROUP BY 1
        SQL

        TOTAL_BUCKETS_NUMBER = 512

        def initialize(relation, column = nil)
          @relation = relation
          @column = column || relation.primary_key
        end

        def unwanted_configuration?(finish, batch_size, start)
          batch_size <= MIN_REQUIRED_BATCH_SIZE ||
            (finish - start) >= MAX_DATA_VOLUME ||
            start > finish
        end

        def estimate_distinct_count(batch_size: nil, start: nil, finish: nil)
          raise 'BatchCount can not be run inside a transaction' if ActiveRecord::Base.connection.transaction_open?

          batch_size ||= DEFAULT_BATCH_SIZE

          start = actual_start(start)
          finish = actual_finish(finish)

          raise "Batch counting expects positive values only for #{@column}" if start < 0 || finish < 0
          return FALLBACK if unwanted_configuration?(finish, batch_size, start)

          batch_start = start
          hll_blob = {}

          while batch_start <= finish
            begin
              hll_blob.merge!(hll_blob_for_batch(batch_start, batch_start + batch_size)) {|_key, old, new| new > old ? new : old }
              batch_start += batch_size
            end
            sleep(SLEEP_TIME_IN_SECONDS)
          end

          estimate_cardinality(hll_blob)
        end

        private

        # arbitrary values that are present in #estimate_cardinality
        # are sourced from https://www.sisense.com/blog/hyperloglog-in-pure-sql/
        # article, they are not representing any entity and serves as tune value
        # for the whole equation
        def estimate_cardinality(hll_blob)
          num_zero_buckets = TOTAL_BUCKETS_NUMBER - hll_blob.size

          num_uniques = (
            ((TOTAL_BUCKETS_NUMBER**2) * (0.7213 / (1 + 1.079 / TOTAL_BUCKETS_NUMBER))) /
              (num_zero_buckets + hll_blob.values.sum { |bucket_hash| 2**(-1 * bucket_hash)} )
          ).to_i

          if num_zero_buckets > 0 && num_uniques < 2.5 * TOTAL_BUCKETS_NUMBER
            ((0.7213 / (1 + 1.079 / TOTAL_BUCKETS_NUMBER)) * (TOTAL_BUCKETS_NUMBER *
              Math.log2(TOTAL_BUCKETS_NUMBER.to_f / num_zero_buckets)))
          else
            num_uniques
          end
        end

        def hll_blob_for_batch(start, finish)
          @relation
            .connection
            .execute(BUCKETED_DATA_SQL % { source_query: source_query(start, finish) })
            .map(&:values)
            .to_h
        end

        # Generate the source query SQL snippet for the provided id range
        #
        # @example SQL query template
        #   SELECT CAST(('X' || md5(CAST(%{column} as text))) as bit(32)) attr_hash_32_bits
        #   FROM %{relation}
        #   WHERE %{pkey} >= %{batch_start} AND %{pkey} < %{batch_end}
        #   AND %{column} IS NOT NULL
        #
        # @param start initial id range
        # @param finish final id range
        # @return [String] SQL query fragment
        def source_query(start, finish)
          col_as_arel = @column.is_a?(Arel::Attributes::Attribute) ? @column : Arel.sql(@column.to_s)
          col_as_text = Arel::Nodes::NamedFunction.new('CAST', [col_as_arel.as('text')])
          md5_of_col = Arel::Nodes::NamedFunction.new('md5', [col_as_text])
          md5_as_hex = Arel::Nodes::Concat.new(Arel.sql("'X'"), md5_of_col)
          bits = Arel::Nodes::NamedFunction.new('CAST', [md5_as_hex.as('bit(32)')])

          @relation
            .where(@relation.primary_key => (start...finish))
            .where(col_as_arel.not_eq(nil))
            .select(bits.as('attr_hash_32_bits')).to_sql
        end

        def actual_start(start)
          start || @relation.unscope(:group, :having).minimum(@relation.primary_key) || 0
        end

        def actual_finish(finish)
          finish || @relation.unscope(:group, :having).maximum(@relation.primary_key) || 0
        end
      end
    end
  end
end