1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
|
# frozen_string_literal: true
module Gitlab
module Usage
module Metrics
module Aggregates
UNION_OF_AGGREGATED_METRICS = 'OR'
INTERSECTION_OF_AGGREGATED_METRICS = 'AND'
ALLOWED_METRICS_AGGREGATIONS = [UNION_OF_AGGREGATED_METRICS, INTERSECTION_OF_AGGREGATED_METRICS].freeze
AGGREGATED_METRICS_PATH = Rails.root.join('lib/gitlab/usage_data_counters/aggregated_metrics/*.yml')
AggregatedMetricError = Class.new(StandardError)
UnknownAggregationOperator = Class.new(AggregatedMetricError)
UnknownAggregationSource = Class.new(AggregatedMetricError)
DATABASE_SOURCE = 'database'
REDIS_SOURCE = 'redis'
SOURCES = {
DATABASE_SOURCE => Sources::PostgresHll,
REDIS_SOURCE => Sources::RedisHll
}.freeze
class Aggregate
delegate :weekly_time_range,
:monthly_time_range,
to: Gitlab::UsageDataCounters::HLLRedisCounter
def initialize(recorded_at)
@aggregated_metrics = load_metrics(AGGREGATED_METRICS_PATH)
@recorded_at = recorded_at
end
def monthly_data
aggregated_metrics_data(**monthly_time_range)
end
def weekly_data
aggregated_metrics_data(**weekly_time_range)
end
private
attr_accessor :aggregated_metrics, :recorded_at
def aggregated_metrics_data(start_date:, end_date:)
aggregated_metrics.each_with_object({}) do |aggregation, data|
next if aggregation[:feature_flag] && Feature.disabled?(aggregation[:feature_flag], default_enabled: :yaml, type: :development)
case aggregation[:source]
when REDIS_SOURCE
data[aggregation[:name]] = calculate_count_for_aggregation(aggregation: aggregation, start_date: start_date, end_date: end_date)
when DATABASE_SOURCE
next unless Feature.enabled?('database_sourced_aggregated_metrics', default_enabled: false, type: :development)
data[aggregation[:name]] = calculate_count_for_aggregation(aggregation: aggregation, start_date: start_date, end_date: end_date)
else
Gitlab::ErrorTracking
.track_and_raise_for_dev_exception(UnknownAggregationSource.new("Aggregation source: '#{aggregation[:source]}' must be included in #{SOURCES.keys}"))
data[aggregation[:name]] = Gitlab::Utils::UsageData::FALLBACK
end
end
end
def calculate_count_for_aggregation(aggregation:, start_date:, end_date:)
source = SOURCES[aggregation[:source]]
case aggregation[:operator]
when UNION_OF_AGGREGATED_METRICS
source.calculate_metrics_union(metric_names: aggregation[:events], start_date: start_date, end_date: end_date, recorded_at: recorded_at)
when INTERSECTION_OF_AGGREGATED_METRICS
calculate_metrics_intersections(source: source, metric_names: aggregation[:events], start_date: start_date, end_date: end_date)
else
Gitlab::ErrorTracking
.track_and_raise_for_dev_exception(UnknownAggregationOperator.new("Events should be aggregated with one of operators #{ALLOWED_METRICS_AGGREGATIONS}"))
Gitlab::Utils::UsageData::FALLBACK
end
rescue Gitlab::UsageDataCounters::HLLRedisCounter::EventError, AggregatedMetricError => error
Gitlab::ErrorTracking.track_and_raise_for_dev_exception(error)
Gitlab::Utils::UsageData::FALLBACK
end
# calculate intersection of 'n' sets based on inclusion exclusion principle https://en.wikipedia.org/wiki/Inclusion%E2%80%93exclusion_principle
# this method will be extracted to dedicated module with https://gitlab.com/gitlab-org/gitlab/-/issues/273391
def calculate_metrics_intersections(source:, metric_names:, start_date:, end_date:, subset_powers_cache: Hash.new({}))
# calculate power of intersection of all given metrics from inclusion exclusion principle
# |A + B + C| = (|A| + |B| + |C|) - (|A & B| + |A & C| + .. + |C & D|) + (|A & B & C|) =>
# |A & B & C| = - (|A| + |B| + |C|) + (|A & B| + |A & C| + .. + |C & D|) + |A + B + C|
# |A + B + C + D| = (|A| + |B| + |C| + |D|) - (|A & B| + |A & C| + .. + |C & D|) + (|A & B & C| + |B & C & D|) - |A & B & C & D| =>
# |A & B & C & D| = (|A| + |B| + |C| + |D|) - (|A & B| + |A & C| + .. + |C & D|) + (|A & B & C| + |B & C & D|) - |A + B + C + D|
# calculate each components of equation except for the last one |A & B & C & D| = (|A| + |B| + |C| + |D|) - (|A & B| + |A & C| + .. + |C & D|) + (|A & B & C| + |B & C & D|) - ...
subset_powers_data = subsets_intersection_powers(source, metric_names, start_date, end_date, subset_powers_cache)
# calculate last component of the equation |A & B & C & D| = .... - |A + B + C + D|
power_of_union_of_all_metrics = begin
subset_powers_cache[metric_names.size][metric_names.join('_+_')] ||= \
source.calculate_metrics_union(metric_names: metric_names, start_date: start_date, end_date: end_date, recorded_at: recorded_at)
end
# in order to determine if part of equation (|A & B & C|, |A & B & C & D|), that represents the intersection that we need to calculate,
# is positive or negative in particular equation we need to determine if number of subsets is even or odd. Please take a look at two examples below
# |A + B + C| = (|A| + |B| + |C|) - (|A & B| + |A & C| + .. + |C & D|) + |A & B & C| =>
# |A & B & C| = - (|A| + |B| + |C|) + (|A & B| + |A & C| + .. + |C & D|) + |A + B + C|
# |A + B + C + D| = (|A| + |B| + |C| + |D|) - (|A & B| + |A & C| + .. + |C & D|) + (|A & B & C| + |B & C & D|) - |A & B & C & D| =>
# |A & B & C & D| = (|A| + |B| + |C| + |D|) - (|A & B| + |A & C| + .. + |C & D|) + (|A & B & C| + |B & C & D|) - |A + B + C + D|
subset_powers_size_even = subset_powers_data.size.even?
# sum all components of equation except for the last one |A & B & C & D| = (|A| + |B| + |C| + |D|) - (|A & B| + |A & C| + .. + |C & D|) + (|A & B & C| + |B & C & D|) - ... =>
sum_of_all_subset_powers = sum_subset_powers(subset_powers_data, subset_powers_size_even)
# add last component of the equation |A & B & C & D| = sum_of_all_subset_powers - |A + B + C + D|
sum_of_all_subset_powers + (subset_powers_size_even ? power_of_union_of_all_metrics : -power_of_union_of_all_metrics)
end
def sum_subset_powers(subset_powers_data, subset_powers_size_even)
sum_without_sign = subset_powers_data.to_enum.with_index.sum do |value, index|
(index + 1).odd? ? value : -value
end
(subset_powers_size_even ? -1 : 1) * sum_without_sign
end
def subsets_intersection_powers(source, metric_names, start_date, end_date, subset_powers_cache)
subset_sizes = (1...metric_names.size)
subset_sizes.map do |subset_size|
if subset_size > 1
# calculate sum of powers of intersection between each subset (with given size) of metrics: #|A + B + C + D| = ... - (|A & B| + |A & C| + .. + |C & D|)
metric_names.combination(subset_size).sum do |metrics_subset|
subset_powers_cache[subset_size][metrics_subset.join('_&_')] ||=
calculate_metrics_intersections(source: source, metric_names: metrics_subset, start_date: start_date, end_date: end_date, subset_powers_cache: subset_powers_cache)
end
else
# calculate sum of powers of each set (metric) alone #|A + B + C + D| = (|A| + |B| + |C| + |D|) - ...
metric_names.sum do |metric|
subset_powers_cache[subset_size][metric] ||= \
source.calculate_metrics_union(metric_names: metric, start_date: start_date, end_date: end_date, recorded_at: recorded_at)
end
end
end
end
def load_metrics(wildcard)
Dir[wildcard].each_with_object([]) do |path, metrics|
metrics.push(*load_yaml_from_path(path))
end
end
def load_yaml_from_path(path)
YAML.safe_load(File.read(path), aliases: true)&.map(&:with_indifferent_access)
end
end
end
end
end
end
|