1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
|
# frozen_string_literal: true
module Gitlab
module Database
module LoadBalancing
# Module used for handling sticking connections to a primary, if
# necessary.
#
# ## Examples
#
# Sticking a user to the primary:
#
# Sticking.stick_if_necessary(:user, current_user.id)
#
# To unstick if possible, or continue using the primary otherwise:
#
# Sticking.unstick_or_continue_sticking(:user, current_user.id)
module Sticking
# The number of seconds after which a session should stop reading from
# the primary.
EXPIRATION = 30
# Sticks to the primary if a write was performed.
def self.stick_if_necessary(namespace, id)
return unless LoadBalancing.enable?
stick(namespace, id) if Session.current.performed_write?
end
# Checks if we are caught-up with all the work
def self.all_caught_up?(namespace, id)
location = last_write_location_for(namespace, id)
return true unless location
load_balancer.all_caught_up?(location).tap do |caught_up|
unstick(namespace, id) if caught_up
end
end
# Selects hosts that have caught up with the primary. This ensures
# atomic selection of the host to prevent the host list changing
# in another thread.
#
# Returns true if one host was selected.
def self.select_caught_up_replicas(namespace, id)
location = last_write_location_for(namespace, id)
# Unlike all_caught_up?, we return false if no write location exists.
# We want to be sure we talk to a replica that has caught up for a specific
# write location. If no such location exists, err on the side of caution.
return false unless location
load_balancer.select_caught_up_hosts(location).tap do |selected|
unstick(namespace, id) if selected
end
end
# Sticks to the primary if necessary, otherwise unsticks an object (if
# it was previously stuck to the primary).
def self.unstick_or_continue_sticking(namespace, id)
Session.current.use_primary! unless all_caught_up?(namespace, id)
end
# Select a replica that has caught up with the primary. If one has not been
# found, stick to the primary.
def self.select_valid_host(namespace, id)
replica_selected = select_caught_up_replicas(namespace, id)
Session.current.use_primary! unless replica_selected
end
# Starts sticking to the primary for the given namespace and id, using
# the latest WAL pointer from the primary.
def self.stick(namespace, id)
return unless LoadBalancing.enable?
mark_primary_write_location(namespace, id)
Session.current.use_primary!
end
def self.bulk_stick(namespace, ids)
return unless LoadBalancing.enable?
with_primary_write_location do |location|
ids.each do |id|
set_write_location_for(namespace, id, location)
end
end
Session.current.use_primary!
end
def self.with_primary_write_location
return unless LoadBalancing.configured?
# Load balancing could be enabled for the Web application server,
# but it's not activated for Sidekiq. We should update Redis with
# the write location just in case load balancing is being used.
location =
if LoadBalancing.enable?
load_balancer.primary_write_location
else
Gitlab::Database.get_write_location(ActiveRecord::Base.connection)
end
return if location.blank?
yield(location)
end
def self.mark_primary_write_location(namespace, id)
with_primary_write_location do |location|
set_write_location_for(namespace, id, location)
end
end
# Stops sticking to the primary.
def self.unstick(namespace, id)
Gitlab::Redis::SharedState.with do |redis|
redis.del(redis_key_for(namespace, id))
end
end
def self.set_write_location_for(namespace, id, location)
Gitlab::Redis::SharedState.with do |redis|
redis.set(redis_key_for(namespace, id), location, ex: EXPIRATION)
end
end
def self.last_write_location_for(namespace, id)
Gitlab::Redis::SharedState.with do |redis|
redis.get(redis_key_for(namespace, id))
end
end
def self.redis_key_for(namespace, id)
"database-load-balancing/write-location/#{namespace}/#{id}"
end
def self.load_balancer
LoadBalancing.proxy.load_balancer
end
end
end
end
end
|