1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
|
# frozen_string_literal: true
module Gitlab
module Database
module LoadBalancing
# A single database host used for load balancing.
class Host
attr_reader :pool, :last_checked_at, :intervals, :load_balancer, :host, :port
delegate :connection, :release_connection, :enable_query_cache!, :disable_query_cache!, :query_cache_enabled, to: :pool
CONNECTION_ERRORS =
if defined?(PG)
[
ActionView::Template::Error,
ActiveRecord::StatementInvalid,
PG::Error
].freeze
else
[
ActionView::Template::Error,
ActiveRecord::StatementInvalid
].freeze
end
# host - The address of the database.
# load_balancer - The LoadBalancer that manages this Host.
def initialize(host, load_balancer, port: nil)
@host = host
@port = port
@load_balancer = load_balancer
@pool = load_balancer.create_replica_connection_pool(::Gitlab::Database::LoadBalancing.pool_size, host, port)
@online = true
@last_checked_at = Time.zone.now
interval = ::Gitlab::Database::LoadBalancing.replica_check_interval
@intervals = (interval..(interval * 2)).step(0.5).to_a
end
# Disconnects the pool, once all connections are no longer in use.
#
# timeout - The time after which the pool should be forcefully
# disconnected.
def disconnect!(timeout: 120)
start_time = ::Gitlab::Metrics::System.monotonic_time
while (::Gitlab::Metrics::System.monotonic_time - start_time) <= timeout
break if pool.connections.none?(&:in_use?)
sleep(2)
end
pool.disconnect!
end
def offline!
::Gitlab::Database::LoadBalancing::Logger.warn(
event: :host_offline,
message: 'Marking host as offline',
db_host: @host,
db_port: @port
)
@online = false
@pool.disconnect!
end
# Returns true if the host is online.
def online?
return @online unless check_replica_status?
refresh_status
if @online
::Gitlab::Database::LoadBalancing::Logger.info(
event: :host_online,
message: 'Host is online after replica status check',
db_host: @host,
db_port: @port
)
else
::Gitlab::Database::LoadBalancing::Logger.warn(
event: :host_offline,
message: 'Host is offline after replica status check',
db_host: @host,
db_port: @port
)
end
@online
rescue *CONNECTION_ERRORS
offline!
false
end
def refresh_status
@online = replica_is_up_to_date?
@last_checked_at = Time.zone.now
end
def check_replica_status?
(Time.zone.now - last_checked_at) >= intervals.sample
end
def replica_is_up_to_date?
replication_lag_below_threshold? || data_is_recent_enough?
end
def replication_lag_below_threshold?
if (lag_time = replication_lag_time)
lag_time <= ::Gitlab::Database::LoadBalancing.max_replication_lag_time
else
false
end
end
# Returns true if the replica has replicated enough data to be useful.
def data_is_recent_enough?
# It's possible for a replica to not replay WAL data for a while,
# despite being up to date. This can happen when a primary does not
# receive any writes for a while.
#
# To prevent this from happening we check if the lag size (in bytes)
# of the replica is small enough for the replica to be useful. We
# only do this if we haven't replicated in a while so we only need
# to connect to the primary when truly necessary.
if (lag_size = replication_lag_size)
lag_size <= ::Gitlab::Database::LoadBalancing.max_replication_difference
else
false
end
end
# Returns the replication lag time of this secondary in seconds as a
# float.
#
# This method will return nil if no lag time could be calculated.
def replication_lag_time
row = query_and_release('SELECT EXTRACT(EPOCH FROM (now() - pg_last_xact_replay_timestamp()))::float as lag')
row['lag'].to_f if row.any?
end
# Returns the number of bytes this secondary is lagging behind the
# primary.
#
# This method will return nil if no lag size could be calculated.
def replication_lag_size
location = connection.quote(primary_write_location)
row = query_and_release(<<-SQL.squish)
SELECT pg_wal_lsn_diff(#{location}, pg_last_wal_replay_lsn())::float
AS diff
SQL
row['diff'].to_i if row.any?
rescue *CONNECTION_ERRORS
nil
end
def primary_write_location
load_balancer.primary_write_location
ensure
load_balancer.release_primary_connection
end
def database_replica_location
row = query_and_release(<<-SQL.squish)
SELECT pg_last_wal_replay_lsn()::text AS location
SQL
row['location'] if row.any?
rescue *CONNECTION_ERRORS
nil
end
# Returns true if this host has caught up to the given transaction
# write location.
#
# location - The transaction write location as reported by a primary.
def caught_up?(location)
string = connection.quote(location)
# In case the host is a primary pg_last_wal_replay_lsn/pg_last_xlog_replay_location() returns
# NULL. The recovery check ensures we treat the host as up-to-date in
# such a case.
query = <<-SQL.squish
SELECT NOT pg_is_in_recovery()
OR pg_wal_lsn_diff(pg_last_wal_replay_lsn(), #{string}) >= 0
AS result
SQL
row = query_and_release(query)
::Gitlab::Utils.to_boolean(row['result'])
rescue *CONNECTION_ERRORS
false
end
def query_and_release(sql)
connection.select_all(sql).first || {}
rescue StandardError
{}
ensure
release_connection
end
end
end
end
end
|