1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
|
# frozen_string_literal: true
module Gitlab
module Database
module LoadBalancing
# Load balancing for ActiveRecord connections.
#
# Each host in the load balancer uses the same credentials as the primary
# database.
#
# This class *requires* that `ActiveRecord::Base.retrieve_connection`
# always returns a connection to the primary.
class LoadBalancer
CACHE_KEY = :gitlab_load_balancer_host
VALID_HOSTS_CACHE_KEY = :gitlab_load_balancer_valid_hosts
attr_reader :host_list
# hosts - The hostnames/addresses of the additional databases.
def initialize(hosts = [])
@host_list = HostList.new(hosts.map { |addr| Host.new(addr, self) })
@connection_db_roles = {}.compare_by_identity
@connection_db_roles_count = {}.compare_by_identity
end
# Yields a connection that can be used for reads.
#
# If no secondaries were available this method will use the primary
# instead.
def read(&block)
connection = nil
conflict_retried = 0
while host
ensure_caching!
begin
connection = host.connection
track_connection_role(connection, ROLE_REPLICA)
return yield connection
rescue StandardError => error
untrack_connection_role(connection)
if serialization_failure?(error)
# This error can occur when a query conflicts. See
# https://www.postgresql.org/docs/current/static/hot-standby.html#HOT-STANDBY-CONFLICT
# for more information.
#
# In this event we'll cycle through the secondaries at most 3
# times before using the primary instead.
will_retry = conflict_retried < @host_list.length * 3
LoadBalancing::Logger.warn(
event: :host_query_conflict,
message: 'Query conflict on host',
conflict_retried: conflict_retried,
will_retry: will_retry,
db_host: host.host,
db_port: host.port,
host_list_length: @host_list.length
)
if will_retry
conflict_retried += 1
release_host
else
break
end
elsif connection_error?(error)
host.offline!
release_host
else
raise error
end
end
end
LoadBalancing::Logger.warn(
event: :no_secondaries_available,
message: 'No secondaries were available, using primary instead',
conflict_retried: conflict_retried,
host_list_length: @host_list.length
)
read_write(&block)
ensure
untrack_connection_role(connection)
end
# Yields a connection that can be used for both reads and writes.
def read_write
connection = nil
# In the event of a failover the primary may be briefly unavailable.
# Instead of immediately grinding to a halt we'll retry the operation
# a few times.
retry_with_backoff do
connection = ActiveRecord::Base.retrieve_connection
track_connection_role(connection, ROLE_PRIMARY)
yield connection
end
ensure
untrack_connection_role(connection)
end
# Recognize the role (primary/replica) of the database this connection
# is connecting to. If the connection is not issued by this load
# balancer, return nil
def db_role_for_connection(connection)
return @connection_db_roles[connection] if @connection_db_roles[connection]
return ROLE_REPLICA if @host_list.manage_pool?(connection.pool)
return ROLE_PRIMARY if connection.pool == ActiveRecord::Base.connection_pool
end
# Returns a host to use for queries.
#
# Hosts are scoped per thread so that multiple threads don't
# accidentally re-use the same host + connection.
def host
RequestStore[CACHE_KEY] ||= current_host_list.next
end
# Releases the host and connection for the current thread.
def release_host
if host = RequestStore[CACHE_KEY]
host.disable_query_cache!
host.release_connection
end
RequestStore.delete(CACHE_KEY)
RequestStore.delete(VALID_HOSTS_CACHE_KEY)
end
def release_primary_connection
ActiveRecord::Base.connection_pool.release_connection
end
# Returns the transaction write location of the primary.
def primary_write_location
location = read_write do |connection|
::Gitlab::Database.get_write_location(connection)
end
return location if location
raise 'Failed to determine the write location of the primary database'
end
# Returns true if all hosts have caught up to the given transaction
# write location.
def all_caught_up?(location)
@host_list.hosts.all? { |host| host.caught_up?(location) }
end
# Returns true if there was at least one host that has caught up with the given transaction.
#
# In case of a retry, this method also stores the set of hosts that have caught up.
def select_caught_up_hosts(location)
all_hosts = @host_list.hosts
valid_hosts = all_hosts.select { |host| host.caught_up?(location) }
return false if valid_hosts.empty?
# Hosts can come online after the time when this scan was done,
# so we need to remember the ones that can be used. If the host went
# offline, we'll just rely on the retry mechanism to use the primary.
set_consistent_hosts_for_request(HostList.new(valid_hosts))
# Since we will be using a subset from the original list, let's just
# pick a random host and mix up the original list to ensure we don't
# only end up using one replica.
RequestStore[CACHE_KEY] = valid_hosts.sample
@host_list.shuffle
true
end
# Returns true if there was at least one host that has caught up with the given transaction.
# Similar to `#select_caught_up_hosts`, picks a random host, to rotate replicas we use.
# Unlike `#select_caught_up_hosts`, does not iterate over all hosts if finds any.
def select_up_to_date_host(location)
all_hosts = @host_list.hosts.shuffle
host = all_hosts.find { |host| host.caught_up?(location) }
return false unless host
RequestStore[CACHE_KEY] = host
true
end
def set_consistent_hosts_for_request(hosts)
RequestStore[VALID_HOSTS_CACHE_KEY] = hosts
end
# Yields a block, retrying it upon error using an exponential backoff.
def retry_with_backoff(retries = 3, time = 2)
retried = 0
last_error = nil
while retried < retries
begin
return yield
rescue StandardError => error
raise error unless connection_error?(error)
# We need to release the primary connection as otherwise Rails
# will keep raising errors when using the connection.
release_primary_connection
last_error = error
sleep(time)
retried += 1
time **= 2
end
end
raise last_error
end
def connection_error?(error)
case error
when ActiveRecord::StatementInvalid, ActionView::Template::Error
# After connecting to the DB Rails will wrap query errors using this
# class.
connection_error?(error.cause)
when *CONNECTION_ERRORS
true
else
# When PG tries to set the client encoding but fails due to a
# connection error it will raise a PG::Error instance. Catching that
# would catch all errors (even those we don't want), so instead we
# check for the message of the error.
error.message.start_with?('invalid encoding name:')
end
end
def serialization_failure?(error)
if error.cause
serialization_failure?(error.cause)
else
error.is_a?(PG::TRSerializationFailure)
end
end
private
def ensure_caching!
host.enable_query_cache! unless host.query_cache_enabled
end
def track_connection_role(connection, role)
@connection_db_roles[connection] = role
@connection_db_roles_count[connection] ||= 0
@connection_db_roles_count[connection] += 1
end
def untrack_connection_role(connection)
return if connection.blank? || @connection_db_roles_count[connection].blank?
@connection_db_roles_count[connection] -= 1
if @connection_db_roles_count[connection] <= 0
@connection_db_roles.delete(connection)
@connection_db_roles_count.delete(connection)
end
end
def current_host_list
RequestStore[VALID_HOSTS_CACHE_KEY] || @host_list
end
end
end
end
end
|