1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
|
# frozen_string_literal: true
module Gitlab
module Issues
module Rebalancing
class State
REDIS_KEY_PREFIX = "gitlab:issues-position-rebalances"
CONCURRENT_RUNNING_REBALANCES_KEY = "#{REDIS_KEY_PREFIX}:running_rebalances"
RECENTLY_FINISHED_REBALANCE_PREFIX = "#{REDIS_KEY_PREFIX}:recently_finished"
REDIS_EXPIRY_TIME = 10.days
MAX_NUMBER_OF_CONCURRENT_REBALANCES = 5
NAMESPACE = 1
PROJECT = 2
def initialize(root_namespace, projects)
@root_namespace = root_namespace
@projects = projects
@rebalanced_container_type = @root_namespace.is_a?(Group) ? NAMESPACE : PROJECT
@rebalanced_container_id = @rebalanced_container_type == NAMESPACE ? @root_namespace.id : projects.take.id # rubocop:disable CodeReuse/ActiveRecord
end
def track_new_running_rebalance
with_redis do |redis|
redis.multi do |multi|
# we trigger re-balance for namespaces(groups) or specific user project
value = "#{rebalanced_container_type}/#{rebalanced_container_id}"
multi.sadd(CONCURRENT_RUNNING_REBALANCES_KEY, value)
multi.expire(CONCURRENT_RUNNING_REBALANCES_KEY, REDIS_EXPIRY_TIME)
end
end
end
def concurrent_running_rebalances_count
with_redis { |redis| redis.scard(CONCURRENT_RUNNING_REBALANCES_KEY).to_i }
end
def rebalance_in_progress?
is_running = case rebalanced_container_type
when NAMESPACE
namespace_ids = self.class.current_rebalancing_containers.map { |string| string.split("#{NAMESPACE}/").second.to_i }.compact
namespace_ids.include?(root_namespace.id)
when PROJECT
project_ids = self.class.current_rebalancing_containers.map { |string| string.split("#{PROJECT}/").second.to_i }.compact
project_ids.include?(projects.take.id) # rubocop:disable CodeReuse/ActiveRecord
else
false
end
refresh_keys_expiration if is_running
is_running
end
def can_start_rebalance?
rebalance_in_progress? || too_many_rebalances_running?
end
def cache_issue_ids(issue_ids)
with_redis do |redis|
values = issue_ids.map { |issue| [issue.relative_position, issue.id] }
redis.multi do |multi|
multi.zadd(issue_ids_key, values) unless values.blank?
multi.expire(issue_ids_key, REDIS_EXPIRY_TIME)
end
end
end
def get_cached_issue_ids(index, limit)
with_redis do |redis|
redis.zrange(issue_ids_key, index, index + limit - 1)
end
end
def cache_current_index(index)
with_redis { |redis| redis.set(current_index_key, index, ex: REDIS_EXPIRY_TIME) }
end
def get_current_index
with_redis { |redis| redis.get(current_index_key).to_i }
end
def cache_current_project_id(project_id)
with_redis { |redis| redis.set(current_project_key, project_id, ex: REDIS_EXPIRY_TIME) }
end
def get_current_project_id
with_redis { |redis| redis.get(current_project_key) }
end
def issue_count
@issue_count ||= with_redis { |redis| redis.zcard(issue_ids_key) }
end
def remove_current_project_id_cache
with_redis { |redis| redis.del(current_project_key) }
end
def refresh_keys_expiration
with_redis do |redis|
redis.multi do |multi|
multi.expire(issue_ids_key, REDIS_EXPIRY_TIME)
multi.expire(current_index_key, REDIS_EXPIRY_TIME)
multi.expire(current_project_key, REDIS_EXPIRY_TIME)
multi.expire(CONCURRENT_RUNNING_REBALANCES_KEY, REDIS_EXPIRY_TIME)
end
end
end
def cleanup_cache
value = "#{rebalanced_container_type}/#{rebalanced_container_id}"
with_redis do |redis|
redis.multi do |multi|
multi.del(issue_ids_key)
multi.del(current_index_key)
multi.del(current_project_key)
multi.srem(CONCURRENT_RUNNING_REBALANCES_KEY, value)
multi.set(self.class.recently_finished_key(rebalanced_container_type, rebalanced_container_id), true, ex: 1.hour)
end
end
end
def self.rebalance_recently_finished?(project_id, namespace_id)
container_id = project_id || namespace_id
container_type = project_id.present? ? PROJECT : NAMESPACE
Gitlab::Redis::SharedState.with { |redis| redis.get(recently_finished_key(container_type, container_id)) }
end
def self.fetch_rebalancing_groups_and_projects
namespace_ids = []
project_ids = []
current_rebalancing_containers.each do |string|
container_type, container_id = string.split('/', 2).map(&:to_i)
if container_type == NAMESPACE
namespace_ids << container_id
elsif container_type == PROJECT
project_ids << container_id
end
end
[namespace_ids, project_ids]
end
private
def self.current_rebalancing_containers
Gitlab::Redis::SharedState.with { |redis| redis.smembers(CONCURRENT_RUNNING_REBALANCES_KEY) }
end
attr_accessor :root_namespace, :projects, :rebalanced_container_type, :rebalanced_container_id
def too_many_rebalances_running?
concurrent_running_rebalances_count <= MAX_NUMBER_OF_CONCURRENT_REBALANCES
end
def issue_ids_key
"#{REDIS_KEY_PREFIX}:#{root_namespace.id}"
end
def current_index_key
"#{issue_ids_key}:current_index"
end
def current_project_key
"#{issue_ids_key}:current_project_id"
end
def self.recently_finished_key(container_type, container_id)
"#{RECENTLY_FINISHED_REBALANCE_PREFIX}:#{container_type}:#{container_id}"
end
def with_redis(&blk)
Gitlab::Redis::SharedState.with(&blk) # rubocop: disable CodeReuse/ActiveRecord
end
end
end
end
end
|