summaryrefslogtreecommitdiff
path: root/lib/gitlab/database/load_balancing
diff options
context:
space:
mode:
Diffstat (limited to 'lib/gitlab/database/load_balancing')
-rw-r--r--lib/gitlab/database/load_balancing/active_record_proxy.rb15
-rw-r--r--lib/gitlab/database/load_balancing/connection_proxy.rb140
-rw-r--r--lib/gitlab/database/load_balancing/host.rb209
-rw-r--r--lib/gitlab/database/load_balancing/host_list.rb99
-rw-r--r--lib/gitlab/database/load_balancing/load_balancer.rb275
-rw-r--r--lib/gitlab/database/load_balancing/logger.rb13
-rw-r--r--lib/gitlab/database/load_balancing/rack_middleware.rb98
-rw-r--r--lib/gitlab/database/load_balancing/resolver.rb52
-rw-r--r--lib/gitlab/database/load_balancing/service_discovery.rb187
-rw-r--r--lib/gitlab/database/load_balancing/session.rb118
-rw-r--r--lib/gitlab/database/load_balancing/sidekiq_client_middleware.rb46
-rw-r--r--lib/gitlab/database/load_balancing/sidekiq_server_middleware.rb71
-rw-r--r--lib/gitlab/database/load_balancing/srv_resolver.rb46
-rw-r--r--lib/gitlab/database/load_balancing/sticking.rb147
14 files changed, 1516 insertions, 0 deletions
diff --git a/lib/gitlab/database/load_balancing/active_record_proxy.rb b/lib/gitlab/database/load_balancing/active_record_proxy.rb
new file mode 100644
index 00000000000..7763497e770
--- /dev/null
+++ b/lib/gitlab/database/load_balancing/active_record_proxy.rb
@@ -0,0 +1,15 @@
+# frozen_string_literal: true
+
+module Gitlab
+ module Database
+ module LoadBalancing
+ # Module injected into ActiveRecord::Base to allow hijacking of the
+ # "connection" method.
+ module ActiveRecordProxy
+ def connection
+ LoadBalancing.proxy
+ end
+ end
+ end
+ end
+end
diff --git a/lib/gitlab/database/load_balancing/connection_proxy.rb b/lib/gitlab/database/load_balancing/connection_proxy.rb
new file mode 100644
index 00000000000..3a09689a724
--- /dev/null
+++ b/lib/gitlab/database/load_balancing/connection_proxy.rb
@@ -0,0 +1,140 @@
+# frozen_string_literal: true
+
+# rubocop:disable GitlabSecurity/PublicSend
+
+module Gitlab
+ module Database
+ module LoadBalancing
+ # Redirecting of ActiveRecord connections.
+ #
+ # The ConnectionProxy class redirects ActiveRecord connection requests to
+ # the right load balancer pool, depending on the type of query.
+ class ConnectionProxy
+ WriteInsideReadOnlyTransactionError = Class.new(StandardError)
+ READ_ONLY_TRANSACTION_KEY = :load_balacing_read_only_transaction
+
+ attr_reader :load_balancer
+
+ # These methods perform writes after which we need to stick to the
+ # primary.
+ STICKY_WRITES = %i(
+ delete
+ delete_all
+ insert
+ update
+ update_all
+ ).freeze
+
+ NON_STICKY_READS = %i(
+ sanitize_limit
+ select
+ select_one
+ select_rows
+ quote_column_name
+ ).freeze
+
+ # hosts - The hosts to use for load balancing.
+ def initialize(hosts = [])
+ @load_balancer = LoadBalancer.new(hosts)
+ end
+
+ def select_all(arel, name = nil, binds = [], preparable: nil)
+ if arel.respond_to?(:locked) && arel.locked
+ # SELECT ... FOR UPDATE queries should be sent to the primary.
+ write_using_load_balancer(:select_all, [arel, name, binds],
+ sticky: true)
+ else
+ read_using_load_balancer(:select_all, [arel, name, binds])
+ end
+ end
+
+ NON_STICKY_READS.each do |name|
+ define_method(name) do |*args, &block|
+ read_using_load_balancer(name, args, &block)
+ end
+ end
+
+ STICKY_WRITES.each do |name|
+ define_method(name) do |*args, &block|
+ write_using_load_balancer(name, args, sticky: true, &block)
+ end
+ end
+
+ def transaction(*args, &block)
+ if current_session.fallback_to_replicas_for_ambiguous_queries?
+ track_read_only_transaction!
+ read_using_load_balancer(:transaction, args, &block)
+ else
+ write_using_load_balancer(:transaction, args, sticky: true, &block)
+ end
+
+ ensure
+ untrack_read_only_transaction!
+ end
+
+ # Delegates all unknown messages to a read-write connection.
+ def method_missing(name, *args, &block)
+ if current_session.fallback_to_replicas_for_ambiguous_queries?
+ read_using_load_balancer(name, args, &block)
+ else
+ write_using_load_balancer(name, args, &block)
+ end
+ end
+
+ # Performs a read using the load balancer.
+ #
+ # name - The name of the method to call on a connection object.
+ def read_using_load_balancer(name, args, &block)
+ if current_session.use_primary? &&
+ !current_session.use_replicas_for_read_queries?
+ @load_balancer.read_write do |connection|
+ connection.send(name, *args, &block)
+ end
+ else
+ @load_balancer.read do |connection|
+ connection.send(name, *args, &block)
+ end
+ end
+ end
+
+ # Performs a write using the load balancer.
+ #
+ # name - The name of the method to call on a connection object.
+ # sticky - If set to true the session will stick to the master after
+ # the write.
+ def write_using_load_balancer(name, args, sticky: false, &block)
+ if read_only_transaction?
+ raise WriteInsideReadOnlyTransactionError, 'A write query is performed inside a read-only transaction'
+ end
+
+ @load_balancer.read_write do |connection|
+ # Sticking has to be enabled before calling the method. Not doing so
+ # could lead to methods called in a block still being performed on a
+ # secondary instead of on a primary (when necessary).
+ current_session.write! if sticky
+
+ connection.send(name, *args, &block)
+ end
+ end
+
+ private
+
+ def current_session
+ ::Gitlab::Database::LoadBalancing::Session.current
+ end
+
+ def track_read_only_transaction!
+ Thread.current[READ_ONLY_TRANSACTION_KEY] = true
+ end
+
+ def untrack_read_only_transaction!
+ Thread.current[READ_ONLY_TRANSACTION_KEY] = nil
+ end
+
+ def read_only_transaction?
+ Thread.current[READ_ONLY_TRANSACTION_KEY] == true
+ end
+ end
+ end
+ end
+end
diff --git a/lib/gitlab/database/load_balancing/host.rb b/lib/gitlab/database/load_balancing/host.rb
new file mode 100644
index 00000000000..3e74b5ea727
--- /dev/null
+++ b/lib/gitlab/database/load_balancing/host.rb
@@ -0,0 +1,209 @@
+# frozen_string_literal: true
+
+module Gitlab
+ module Database
+ module LoadBalancing
+ # A single database host used for load balancing.
+ class Host
+ attr_reader :pool, :last_checked_at, :intervals, :load_balancer, :host, :port
+
+ delegate :connection, :release_connection, :enable_query_cache!, :disable_query_cache!, :query_cache_enabled, to: :pool
+
+ CONNECTION_ERRORS =
+ if defined?(PG)
+ [
+ ActionView::Template::Error,
+ ActiveRecord::StatementInvalid,
+ PG::Error
+ ].freeze
+ else
+ [
+ ActionView::Template::Error,
+ ActiveRecord::StatementInvalid
+ ].freeze
+ end
+
+ # host - The address of the database.
+ # load_balancer - The LoadBalancer that manages this Host.
+ def initialize(host, load_balancer, port: nil)
+ @host = host
+ @port = port
+ @load_balancer = load_balancer
+ @pool = Database.create_connection_pool(LoadBalancing.pool_size, host, port)
+ @online = true
+ @last_checked_at = Time.zone.now
+
+ interval = LoadBalancing.replica_check_interval
+ @intervals = (interval..(interval * 2)).step(0.5).to_a
+ end
+
+ # Disconnects the pool, once all connections are no longer in use.
+ #
+ # timeout - The time after which the pool should be forcefully
+ # disconnected.
+ def disconnect!(timeout = 120)
+ start_time = Metrics::System.monotonic_time
+
+ while (Metrics::System.monotonic_time - start_time) <= timeout
+ break if pool.connections.none?(&:in_use?)
+
+ sleep(2)
+ end
+
+ pool.disconnect!
+ end
+
+ def offline!
+ LoadBalancing::Logger.warn(
+ event: :host_offline,
+ message: 'Marking host as offline',
+ db_host: @host,
+ db_port: @port
+ )
+
+ @online = false
+ @pool.disconnect!
+ end
+
+ # Returns true if the host is online.
+ def online?
+ return @online unless check_replica_status?
+
+ refresh_status
+
+ if @online
+ LoadBalancing::Logger.info(
+ event: :host_online,
+ message: 'Host is online after replica status check',
+ db_host: @host,
+ db_port: @port
+ )
+ else
+ LoadBalancing::Logger.warn(
+ event: :host_offline,
+ message: 'Host is offline after replica status check',
+ db_host: @host,
+ db_port: @port
+ )
+ end
+
+ @online
+ rescue *CONNECTION_ERRORS
+ offline!
+ false
+ end
+
+ def refresh_status
+ @online = replica_is_up_to_date?
+ @last_checked_at = Time.zone.now
+ end
+
+ def check_replica_status?
+ (Time.zone.now - last_checked_at) >= intervals.sample
+ end
+
+ def replica_is_up_to_date?
+ replication_lag_below_threshold? || data_is_recent_enough?
+ end
+
+ def replication_lag_below_threshold?
+ if (lag_time = replication_lag_time)
+ lag_time <= LoadBalancing.max_replication_lag_time
+ else
+ false
+ end
+ end
+
+ # Returns true if the replica has replicated enough data to be useful.
+ def data_is_recent_enough?
+ # It's possible for a replica to not replay WAL data for a while,
+ # despite being up to date. This can happen when a primary does not
+ # receive any writes for a while.
+ #
+ # To prevent this from happening we check if the lag size (in bytes)
+ # of the replica is small enough for the replica to be useful. We
+ # only do this if we haven't replicated in a while so we only need
+ # to connect to the primary when truly necessary.
+ if (lag_size = replication_lag_size)
+ lag_size <= LoadBalancing.max_replication_difference
+ else
+ false
+ end
+ end
+
+ # Returns the replication lag time of this secondary in seconds as a
+ # float.
+ #
+ # This method will return nil if no lag time could be calculated.
+ def replication_lag_time
+ row = query_and_release('SELECT EXTRACT(EPOCH FROM (now() - pg_last_xact_replay_timestamp()))::float as lag')
+
+ row['lag'].to_f if row.any?
+ end
+
+ # Returns the number of bytes this secondary is lagging behind the
+ # primary.
+ #
+ # This method will return nil if no lag size could be calculated.
+ def replication_lag_size
+ location = connection.quote(primary_write_location)
+ row = query_and_release(<<-SQL.squish)
+ SELECT pg_wal_lsn_diff(#{location}, pg_last_wal_replay_lsn())::float
+ AS diff
+ SQL
+
+ row['diff'].to_i if row.any?
+ rescue *CONNECTION_ERRORS
+ nil
+ end
+
+ def primary_write_location
+ load_balancer.primary_write_location
+ ensure
+ load_balancer.release_primary_connection
+ end
+
+ def database_replica_location
+ row = query_and_release(<<-SQL.squish)
+ SELECT pg_last_wal_replay_lsn()::text AS location
+ SQL
+
+ row['location'] if row.any?
+ rescue *CONNECTION_ERRORS
+ nil
+ end
+
+ # Returns true if this host has caught up to the given transaction
+ # write location.
+ #
+ # location - The transaction write location as reported by a primary.
+ def caught_up?(location)
+ string = connection.quote(location)
+
+ # In case the host is a primary pg_last_wal_replay_lsn/pg_last_xlog_replay_location() returns
+ # NULL. The recovery check ensures we treat the host as up-to-date in
+ # such a case.
+ query = <<-SQL.squish
+ SELECT NOT pg_is_in_recovery()
+ OR pg_wal_lsn_diff(pg_last_wal_replay_lsn(), #{string}) >= 0
+ AS result
+ SQL
+
+ row = query_and_release(query)
+
+ ::Gitlab::Utils.to_boolean(row['result'])
+ rescue *CONNECTION_ERRORS
+ false
+ end
+
+ def query_and_release(sql)
+ connection.select_all(sql).first || {}
+ rescue StandardError
+ {}
+ ensure
+ release_connection
+ end
+ end
+ end
+ end
+end
diff --git a/lib/gitlab/database/load_balancing/host_list.rb b/lib/gitlab/database/load_balancing/host_list.rb
new file mode 100644
index 00000000000..24800012947
--- /dev/null
+++ b/lib/gitlab/database/load_balancing/host_list.rb
@@ -0,0 +1,99 @@
+# frozen_string_literal: true
+
+module Gitlab
+ module Database
+ module LoadBalancing
+ # A list of database hosts to use for connections.
+ class HostList
+ # hosts - The list of secondary hosts to add.
+ def initialize(hosts = [])
+ @hosts = hosts.shuffle
+ @pools = Set.new
+ @index = 0
+ @mutex = Mutex.new
+ @hosts_gauge = Gitlab::Metrics.gauge(:db_load_balancing_hosts, 'Current number of load balancing hosts')
+
+ set_metrics!
+ update_pools
+ end
+
+ def hosts
+ @mutex.synchronize { @hosts.dup }
+ end
+
+ def shuffle
+ @mutex.synchronize do
+ unsafe_shuffle
+ end
+ end
+
+ def length
+ @mutex.synchronize { @hosts.length }
+ end
+
+ def host_names_and_ports
+ @mutex.synchronize { @hosts.map { |host| [host.host, host.port] } }
+ end
+
+ def manage_pool?(pool)
+ @pools.include?(pool)
+ end
+
+ def hosts=(hosts)
+ @mutex.synchronize do
+ @hosts = hosts
+ unsafe_shuffle
+ update_pools
+ end
+
+ set_metrics!
+ end
+
+ # Sets metrics before returning next host
+ def next
+ next_host.tap do |_|
+ set_metrics!
+ end
+ end
+
+ private
+
+ def unsafe_shuffle
+ @hosts = @hosts.shuffle
+ @index = 0
+ end
+
+ # Returns the next available host.
+ #
+ # Returns a Gitlab::Database::LoadBalancing::Host instance, or nil if no
+ # hosts were available.
+ def next_host
+ @mutex.synchronize do
+ break if @hosts.empty?
+
+ started_at = @index
+
+ loop do
+ host = @hosts[@index]
+ @index = (@index + 1) % @hosts.length
+
+ break host if host.online?
+
+ # Return nil once we have cycled through all hosts and none were
+ # available.
+ break if @index == started_at
+ end
+ end
+ end
+
+ def set_metrics!
+ @hosts_gauge.set({}, @hosts.length)
+ end
+
+ def update_pools
+ @pools = Set.new(@hosts.map(&:pool))
+ end
+ end
+ end
+ end
+end
diff --git a/lib/gitlab/database/load_balancing/load_balancer.rb b/lib/gitlab/database/load_balancing/load_balancer.rb
new file mode 100644
index 00000000000..a833bb8491f
--- /dev/null
+++ b/lib/gitlab/database/load_balancing/load_balancer.rb
@@ -0,0 +1,275 @@
+# frozen_string_literal: true
+
+module Gitlab
+ module Database
+ module LoadBalancing
+ # Load balancing for ActiveRecord connections.
+ #
+ # Each host in the load balancer uses the same credentials as the primary
+ # database.
+ #
+ # This class *requires* that `ActiveRecord::Base.retrieve_connection`
+ # always returns a connection to the primary.
+ class LoadBalancer
+ CACHE_KEY = :gitlab_load_balancer_host
+ VALID_HOSTS_CACHE_KEY = :gitlab_load_balancer_valid_hosts
+
+ attr_reader :host_list
+
+ # hosts - The hostnames/addresses of the additional databases.
+ def initialize(hosts = [])
+ @host_list = HostList.new(hosts.map { |addr| Host.new(addr, self) })
+ @connection_db_roles = {}.compare_by_identity
+ @connection_db_roles_count = {}.compare_by_identity
+ end
+
+ # Yields a connection that can be used for reads.
+ #
+ # If no secondaries were available this method will use the primary
+ # instead.
+ def read(&block)
+ connection = nil
+ conflict_retried = 0
+
+ while host
+ ensure_caching!
+
+ begin
+ connection = host.connection
+ track_connection_role(connection, ROLE_REPLICA)
+
+ return yield connection
+ rescue StandardError => error
+ untrack_connection_role(connection)
+
+ if serialization_failure?(error)
+ # This error can occur when a query conflicts. See
+ # https://www.postgresql.org/docs/current/static/hot-standby.html#HOT-STANDBY-CONFLICT
+ # for more information.
+ #
+ # In this event we'll cycle through the secondaries at most 3
+ # times before using the primary instead.
+ will_retry = conflict_retried < @host_list.length * 3
+
+ LoadBalancing::Logger.warn(
+ event: :host_query_conflict,
+ message: 'Query conflict on host',
+ conflict_retried: conflict_retried,
+ will_retry: will_retry,
+ db_host: host.host,
+ db_port: host.port,
+ host_list_length: @host_list.length
+ )
+
+ if will_retry
+ conflict_retried += 1
+ release_host
+ else
+ break
+ end
+ elsif connection_error?(error)
+ host.offline!
+ release_host
+ else
+ raise error
+ end
+ end
+ end
+
+ LoadBalancing::Logger.warn(
+ event: :no_secondaries_available,
+ message: 'No secondaries were available, using primary instead',
+ conflict_retried: conflict_retried,
+ host_list_length: @host_list.length
+ )
+
+ read_write(&block)
+ ensure
+ untrack_connection_role(connection)
+ end
+
+ # Yields a connection that can be used for both reads and writes.
+ def read_write
+ connection = nil
+ # In the event of a failover the primary may be briefly unavailable.
+ # Instead of immediately grinding to a halt we'll retry the operation
+ # a few times.
+ retry_with_backoff do
+ connection = ActiveRecord::Base.retrieve_connection
+ track_connection_role(connection, ROLE_PRIMARY)
+
+ yield connection
+ end
+ ensure
+ untrack_connection_role(connection)
+ end
+
+ # Recognize the role (primary/replica) of the database this connection
+ # is connecting to. If the connection is not issued by this load
+ # balancer, return nil
+ def db_role_for_connection(connection)
+ return @connection_db_roles[connection] if @connection_db_roles[connection]
+ return ROLE_REPLICA if @host_list.manage_pool?(connection.pool)
+ return ROLE_PRIMARY if connection.pool == ActiveRecord::Base.connection_pool
+ end
+
+ # Returns a host to use for queries.
+ #
+ # Hosts are scoped per thread so that multiple threads don't
+ # accidentally re-use the same host + connection.
+ def host
+ RequestStore[CACHE_KEY] ||= current_host_list.next
+ end
+
+ # Releases the host and connection for the current thread.
+ def release_host
+ if host = RequestStore[CACHE_KEY]
+ host.disable_query_cache!
+ host.release_connection
+ end
+
+ RequestStore.delete(CACHE_KEY)
+ RequestStore.delete(VALID_HOSTS_CACHE_KEY)
+ end
+
+ def release_primary_connection
+ ActiveRecord::Base.connection_pool.release_connection
+ end
+
+ # Returns the transaction write location of the primary.
+ def primary_write_location
+ location = read_write do |connection|
+ ::Gitlab::Database.get_write_location(connection)
+ end
+
+ return location if location
+
+ raise 'Failed to determine the write location of the primary database'
+ end
+
+ # Returns true if all hosts have caught up to the given transaction
+ # write location.
+ def all_caught_up?(location)
+ @host_list.hosts.all? { |host| host.caught_up?(location) }
+ end
+
+ # Returns true if there was at least one host that has caught up with the given transaction.
+ #
+ # In case of a retry, this method also stores the set of hosts that have caught up.
+ def select_caught_up_hosts(location)
+ all_hosts = @host_list.hosts
+ valid_hosts = all_hosts.select { |host| host.caught_up?(location) }
+
+ return false if valid_hosts.empty?
+
+ # Hosts can come online after the time when this scan was done,
+ # so we need to remember the ones that can be used. If the host went
+ # offline, we'll just rely on the retry mechanism to use the primary.
+ set_consistent_hosts_for_request(HostList.new(valid_hosts))
+
+ # Since we will be using a subset from the original list, let's just
+ # pick a random host and mix up the original list to ensure we don't
+ # only end up using one replica.
+ RequestStore[CACHE_KEY] = valid_hosts.sample
+ @host_list.shuffle
+
+ true
+ end
+
+ # Returns true if there was at least one host that has caught up with the given transaction.
+ # Similar to `#select_caught_up_hosts`, picks a random host, to rotate replicas we use.
+ # Unlike `#select_caught_up_hosts`, does not iterate over all hosts if finds any.
+ def select_up_to_date_host(location)
+ all_hosts = @host_list.hosts.shuffle
+ host = all_hosts.find { |host| host.caught_up?(location) }
+
+ return false unless host
+
+ RequestStore[CACHE_KEY] = host
+
+ true
+ end
+
+ def set_consistent_hosts_for_request(hosts)
+ RequestStore[VALID_HOSTS_CACHE_KEY] = hosts
+ end
+
+ # Yields a block, retrying it upon error using an exponential backoff.
+ def retry_with_backoff(retries = 3, time = 2)
+ retried = 0
+ last_error = nil
+
+ while retried < retries
+ begin
+ return yield
+ rescue StandardError => error
+ raise error unless connection_error?(error)
+
+ # We need to release the primary connection as otherwise Rails
+ # will keep raising errors when using the connection.
+ release_primary_connection
+
+ last_error = error
+ sleep(time)
+ retried += 1
+ time **= 2
+ end
+ end
+
+ raise last_error
+ end
+
+ def connection_error?(error)
+ case error
+ when ActiveRecord::StatementInvalid, ActionView::Template::Error
+ # After connecting to the DB Rails will wrap query errors using this
+ # class.
+ connection_error?(error.cause)
+ when *CONNECTION_ERRORS
+ true
+ else
+ # When PG tries to set the client encoding but fails due to a
+ # connection error it will raise a PG::Error instance. Catching that
+ # would catch all errors (even those we don't want), so instead we
+ # check for the message of the error.
+ error.message.start_with?('invalid encoding name:')
+ end
+ end
+
+ def serialization_failure?(error)
+ if error.cause
+ serialization_failure?(error.cause)
+ else
+ error.is_a?(PG::TRSerializationFailure)
+ end
+ end
+
+ private
+
+ def ensure_caching!
+ host.enable_query_cache! unless host.query_cache_enabled
+ end
+
+ def track_connection_role(connection, role)
+ @connection_db_roles[connection] = role
+ @connection_db_roles_count[connection] ||= 0
+ @connection_db_roles_count[connection] += 1
+ end
+
+ def untrack_connection_role(connection)
+ return if connection.blank? || @connection_db_roles_count[connection].blank?
+
+ @connection_db_roles_count[connection] -= 1
+ if @connection_db_roles_count[connection] <= 0
+ @connection_db_roles.delete(connection)
+ @connection_db_roles_count.delete(connection)
+ end
+ end
+
+ def current_host_list
+ RequestStore[VALID_HOSTS_CACHE_KEY] || @host_list
+ end
+ end
+ end
+ end
+end
diff --git a/lib/gitlab/database/load_balancing/logger.rb b/lib/gitlab/database/load_balancing/logger.rb
new file mode 100644
index 00000000000..ee67ffcc99c
--- /dev/null
+++ b/lib/gitlab/database/load_balancing/logger.rb
@@ -0,0 +1,13 @@
+# frozen_string_literal: true
+
+module Gitlab
+ module Database
+ module LoadBalancing
+ class Logger < ::Gitlab::JsonLogger
+ def self.file_name_noext
+ 'database_load_balancing'
+ end
+ end
+ end
+ end
+end
diff --git a/lib/gitlab/database/load_balancing/rack_middleware.rb b/lib/gitlab/database/load_balancing/rack_middleware.rb
new file mode 100644
index 00000000000..4734ff99bd3
--- /dev/null
+++ b/lib/gitlab/database/load_balancing/rack_middleware.rb
@@ -0,0 +1,98 @@
+# frozen_string_literal: true
+
+module Gitlab
+ module Database
+ module LoadBalancing
+ # Rack middleware to handle sticking when serving Rails requests. Grape
+ # API calls are handled separately as different API endpoints need to
+ # stick based on different objects.
+ class RackMiddleware
+ STICK_OBJECT = 'load_balancing.stick_object'
+
+ # Unsticks or continues sticking the current request.
+ #
+ # This method also updates the Rack environment so #call can later
+ # determine if we still need to stick or not.
+ #
+ # env - The Rack environment.
+ # namespace - The namespace to use for sticking.
+ # id - The identifier to use for sticking.
+ def self.stick_or_unstick(env, namespace, id)
+ return unless LoadBalancing.enable?
+
+ Sticking.unstick_or_continue_sticking(namespace, id)
+
+ env[STICK_OBJECT] ||= Set.new
+ env[STICK_OBJECT] << [namespace, id]
+ end
+
+ def initialize(app)
+ @app = app
+ end
+
+ def call(env)
+ # Ensure that any state that may have run before the first request
+ # doesn't linger around.
+ clear
+
+ unstick_or_continue_sticking(env)
+
+ result = @app.call(env)
+
+ stick_if_necessary(env)
+
+ result
+ ensure
+ clear
+ end
+
+ # Determine if we need to stick based on currently available user data.
+ #
+ # Typically this code will only be reachable for Rails requests as
+ # Grape data is not yet available at this point.
+ def unstick_or_continue_sticking(env)
+ namespaces_and_ids = sticking_namespaces_and_ids(env)
+
+ namespaces_and_ids.each do |namespace, id|
+ Sticking.unstick_or_continue_sticking(namespace, id)
+ end
+ end
+
+ # Determine if we need to stick after handling a request.
+ def stick_if_necessary(env)
+ namespaces_and_ids = sticking_namespaces_and_ids(env)
+
+ namespaces_and_ids.each do |namespace, id|
+ Sticking.stick_if_necessary(namespace, id)
+ end
+ end
+
+ def clear
+ load_balancer.release_host
+ Session.clear_session
+ end
+
+ def load_balancer
+ LoadBalancing.proxy.load_balancer
+ end
+
+ # Determines the sticking namespace and identifier based on the Rack
+ # environment.
+ #
+ # For Rails requests this uses warden, but Grape and others have to
+ # manually set the right environment variable.
+ def sticking_namespaces_and_ids(env)
+ warden = env['warden']
+
+ if warden && warden.user
+ [[:user, warden.user.id]]
+ elsif env[STICK_OBJECT].present?
+ env[STICK_OBJECT].to_a
+ else
+ []
+ end
+ end
+ end
+ end
+ end
+end
diff --git a/lib/gitlab/database/load_balancing/resolver.rb b/lib/gitlab/database/load_balancing/resolver.rb
new file mode 100644
index 00000000000..a291080cc3d
--- /dev/null
+++ b/lib/gitlab/database/load_balancing/resolver.rb
@@ -0,0 +1,52 @@
+# frozen_string_literal: true
+
+require 'net/dns'
+require 'resolv'
+
+module Gitlab
+ module Database
+ module LoadBalancing
+ class Resolver
+ UnresolvableNameserverError = Class.new(StandardError)
+
+ def initialize(nameserver)
+ @nameserver = nameserver
+ end
+
+ def resolve
+ address = ip_address || ip_address_from_hosts_file ||
+ ip_address_from_dns
+
+ unless address
+ raise UnresolvableNameserverError,
+ "could not resolve #{@nameserver}"
+ end
+
+ address
+ end
+
+ private
+
+ def ip_address
+ IPAddr.new(@nameserver)
+ rescue IPAddr::InvalidAddressError
+ end
+
+ def ip_address_from_hosts_file
+ ip = Resolv::Hosts.new.getaddress(@nameserver)
+ IPAddr.new(ip)
+ rescue Resolv::ResolvError
+ end
+
+ def ip_address_from_dns
+ answer = Net::DNS::Resolver.start(@nameserver, Net::DNS::A).answer
+ return if answer.empty?
+
+ answer.first.address
+ rescue Net::DNS::Resolver::NoResponseError
+ raise UnresolvableNameserverError, "no response from DNS server(s)"
+ end
+ end
+ end
+ end
+end
diff --git a/lib/gitlab/database/load_balancing/service_discovery.rb b/lib/gitlab/database/load_balancing/service_discovery.rb
new file mode 100644
index 00000000000..9b42b25be1c
--- /dev/null
+++ b/lib/gitlab/database/load_balancing/service_discovery.rb
@@ -0,0 +1,187 @@
+# frozen_string_literal: true
+
+require 'net/dns'
+require 'resolv'
+
+module Gitlab
+ module Database
+ module LoadBalancing
+ # Service discovery of secondary database hosts.
+ #
+ # Service discovery works by periodically looking up a DNS record. If the
+ # DNS record returns a new list of hosts, this class will update the load
+ # balancer with said hosts. Requests may continue to use the old hosts
+ # until they complete.
+ class ServiceDiscovery
+ attr_reader :interval, :record, :record_type, :disconnect_timeout
+
+ MAX_SLEEP_ADJUSTMENT = 10
+
+ RECORD_TYPES = {
+ 'A' => Net::DNS::A,
+ 'SRV' => Net::DNS::SRV
+ }.freeze
+
+ Address = Struct.new(:hostname, :port) do
+ def to_s
+ port ? "#{hostname}:#{port}" : hostname
+ end
+
+ def <=>(other)
+ self.to_s <=> other.to_s
+ end
+ end
+
+ # nameserver - The nameserver to use for DNS lookups.
+ # port - The port of the nameserver.
+ # record - The DNS record to look up for retrieving the secondaries.
+ # record_type - The type of DNS record to look up
+ # interval - The time to wait between lookups.
+ # disconnect_timeout - The time after which an old host should be
+ # forcefully disconnected.
+ # use_tcp - Use TCP instaed of UDP to look up resources
+ def initialize(nameserver:, port:, record:, record_type: 'A', interval: 60, disconnect_timeout: 120, use_tcp: false)
+ @nameserver = nameserver
+ @port = port
+ @record = record
+ @record_type = record_type_for(record_type)
+ @interval = interval
+ @disconnect_timeout = disconnect_timeout
+ @use_tcp = use_tcp
+ end
+
+ def start
+ Thread.new do
+ loop do
+ interval =
+ begin
+ refresh_if_necessary
+ rescue StandardError => error
+ # Any exceptions that might occur should be reported to
+ # Sentry, instead of silently terminating this thread.
+ Gitlab::ErrorTracking.track_exception(error)
+
+ Gitlab::AppLogger.error(
+ "Service discovery encountered an error: #{error.message}"
+ )
+
+ self.interval
+ end
+
+ # We slightly randomize the sleep() interval. This should reduce
+ # the likelihood of _all_ processes refreshing at the same time,
+ # possibly putting unnecessary pressure on the DNS server.
+ sleep(interval + rand(MAX_SLEEP_ADJUSTMENT))
+ end
+ end
+ end
+
+ # Refreshes the hosts, but only if the DNS record returned a new list of
+ # addresses.
+ #
+ # The return value is the amount of time (in seconds) to wait before
+ # checking the DNS record for any changes.
+ def refresh_if_necessary
+ interval, from_dns = addresses_from_dns
+
+ current = addresses_from_load_balancer
+
+ replace_hosts(from_dns) if from_dns != current
+
+ interval
+ end
+
+ # Replaces all the hosts in the load balancer with the new ones,
+ # disconnecting the old connections.
+ #
+ # addresses - An Array of Address structs to use for the new hosts.
+ def replace_hosts(addresses)
+ old_hosts = load_balancer.host_list.hosts
+
+ load_balancer.host_list.hosts = addresses.map do |addr|
+ Host.new(addr.hostname, load_balancer, port: addr.port)
+ end
+
+ # We must explicitly disconnect the old connections, otherwise we may
+ # leak database connections over time. For example, if a request
+ # started just before we added the new hosts it will use an old
+ # host/connection. While this connection will be checked in and out,
+ # it won't be explicitly disconnected.
+ old_hosts.each do |host|
+ host.disconnect!(disconnect_timeout)
+ end
+ end
+
+ # Returns an Array containing:
+ #
+ # 1. The time to wait for the next check.
+ # 2. An array containing the hostnames of the DNS record.
+ def addresses_from_dns
+ response = resolver.search(record, record_type)
+ resources = response.answer
+
+ addresses =
+ case record_type
+ when Net::DNS::A
+ addresses_from_a_record(resources)
+ when Net::DNS::SRV
+ addresses_from_srv_record(response)
+ end
+
+ # Addresses are sorted so we can directly compare the old and new
+ # addresses, without having to use any additional data structures.
+ [new_wait_time_for(resources), addresses.sort]
+ end
+
+ def new_wait_time_for(resources)
+ wait = resources.first&.ttl || interval
+
+ # The preconfigured interval acts as a minimum amount of time to
+ # wait.
+ wait < interval ? interval : wait
+ end
+
+ def addresses_from_load_balancer
+ load_balancer.host_list.host_names_and_ports.map do |hostname, port|
+ Address.new(hostname, port)
+ end.sort
+ end
+
+ def load_balancer
+ LoadBalancing.proxy.load_balancer
+ end
+
+ def resolver
+ @resolver ||= Net::DNS::Resolver.new(
+ nameservers: Resolver.new(@nameserver).resolve,
+ port: @port,
+ use_tcp: @use_tcp
+ )
+ end
+
+ private
+
+ def record_type_for(type)
+ RECORD_TYPES.fetch(type) do
+ raise(ArgumentError, "Unsupported record type: #{type}")
+ end
+ end
+
+ def addresses_from_srv_record(response)
+ srv_resolver = SrvResolver.new(resolver, response.additional)
+
+ response.answer.map do |r|
+ address = srv_resolver.address_for(r.host.to_s)
+ next unless address
+
+ Address.new(address.to_s, r.port)
+ end.compact
+ end
+
+ def addresses_from_a_record(resources)
+ resources.map { |r| Address.new(r.address.to_s) }
+ end
+ end
+ end
+ end
+end
diff --git a/lib/gitlab/database/load_balancing/session.rb b/lib/gitlab/database/load_balancing/session.rb
new file mode 100644
index 00000000000..3682c9265c2
--- /dev/null
+++ b/lib/gitlab/database/load_balancing/session.rb
@@ -0,0 +1,118 @@
+# frozen_string_literal: true
+
+module Gitlab
+ module Database
+ module LoadBalancing
+ # Tracking of load balancing state per user session.
+ #
+ # A session starts at the beginning of a request and ends once the request
+ # has been completed. Sessions can be used to keep track of what hosts
+ # should be used for queries.
+ class Session
+ CACHE_KEY = :gitlab_load_balancer_session
+
+ def self.current
+ RequestStore[CACHE_KEY] ||= new
+ end
+
+ def self.clear_session
+ RequestStore.delete(CACHE_KEY)
+ end
+
+ def self.without_sticky_writes(&block)
+ current.ignore_writes(&block)
+ end
+
+ def initialize
+ @use_primary = false
+ @performed_write = false
+ @ignore_writes = false
+ @fallback_to_replicas_for_ambiguous_queries = false
+ @use_replicas_for_read_queries = false
+ end
+
+ def use_primary?
+ @use_primary
+ end
+
+ alias_method :using_primary?, :use_primary?
+
+ def use_primary!
+ @use_primary = true
+ end
+
+ def use_primary(&blk)
+ used_primary = @use_primary
+ @use_primary = true
+ yield
+ ensure
+ @use_primary = used_primary || @performed_write
+ end
+
+ def ignore_writes(&block)
+ @ignore_writes = true
+
+ yield
+ ensure
+ @ignore_writes = false
+ end
+
+ # Indicates that the read SQL statements from anywhere inside this
+ # blocks should use a replica, regardless of the current primary
+ # stickiness or whether a write query is already performed in the
+ # current session. This interface is reserved mostly for performance
+ # purpose. This is a good tool to push expensive queries, which can
+ # tolerate the replica lags, to the replicas.
+ #
+ # Write and ambiguous queries inside this block are still handled by
+ # the primary.
+ def use_replicas_for_read_queries(&blk)
+ previous_flag = @use_replicas_for_read_queries
+ @use_replicas_for_read_queries = true
+ yield
+ ensure
+ @use_replicas_for_read_queries = previous_flag
+ end
+
+ def use_replicas_for_read_queries?
+ @use_replicas_for_read_queries == true
+ end
+
+ # Indicate that the ambiguous SQL statements from anywhere inside this
+ # block should use a replica. The ambiguous statements include:
+ # - Transactions.
+ # - Custom queries (via exec_query, execute, etc.)
+ # - In-flight connection configuration change (SET LOCAL statement_timeout = 5000)
+ #
+ # This is a weak enforcement. This helper incorporates well with
+ # primary stickiness:
+ # - If the queries are about to write
+ # - The current session already performed writes
+ # - It prefers to use primary, aka, use_primary or use_primary! were called
+ def fallback_to_replicas_for_ambiguous_queries(&blk)
+ previous_flag = @fallback_to_replicas_for_ambiguous_queries
+ @fallback_to_replicas_for_ambiguous_queries = true
+ yield
+ ensure
+ @fallback_to_replicas_for_ambiguous_queries = previous_flag
+ end
+
+ def fallback_to_replicas_for_ambiguous_queries?
+ @fallback_to_replicas_for_ambiguous_queries == true && !use_primary? && !performed_write?
+ end
+
+ def write!
+ @performed_write = true
+
+ return if @ignore_writes
+
+ use_primary!
+ end
+
+ def performed_write?
+ @performed_write
+ end
+ end
+ end
+ end
+end
diff --git a/lib/gitlab/database/load_balancing/sidekiq_client_middleware.rb b/lib/gitlab/database/load_balancing/sidekiq_client_middleware.rb
new file mode 100644
index 00000000000..524d69c00c0
--- /dev/null
+++ b/lib/gitlab/database/load_balancing/sidekiq_client_middleware.rb
@@ -0,0 +1,46 @@
+# frozen_string_literal: true
+
+module Gitlab
+ module Database
+ module LoadBalancing
+ class SidekiqClientMiddleware
+ def call(worker_class, job, _queue, _redis_pool)
+ worker_class = worker_class.to_s.safe_constantize
+
+ mark_data_consistency_location(worker_class, job)
+
+ yield
+ end
+
+ private
+
+ def mark_data_consistency_location(worker_class, job)
+ # Mailers can't be constantized
+ return unless worker_class
+ return unless worker_class.include?(::ApplicationWorker)
+ return unless worker_class.get_data_consistency_feature_flag_enabled?
+
+ return if location_already_provided?(job)
+
+ job['worker_data_consistency'] = worker_class.get_data_consistency
+
+ return unless worker_class.utilizes_load_balancing_capabilities?
+
+ if Session.current.use_primary?
+ job['database_write_location'] = load_balancer.primary_write_location
+ else
+ job['database_replica_location'] = load_balancer.host.database_replica_location
+ end
+ end
+
+ def location_already_provided?(job)
+ job['database_replica_location'] || job['database_write_location']
+ end
+
+ def load_balancer
+ LoadBalancing.proxy.load_balancer
+ end
+ end
+ end
+ end
+end
diff --git a/lib/gitlab/database/load_balancing/sidekiq_server_middleware.rb b/lib/gitlab/database/load_balancing/sidekiq_server_middleware.rb
new file mode 100644
index 00000000000..9bd0adf8dbd
--- /dev/null
+++ b/lib/gitlab/database/load_balancing/sidekiq_server_middleware.rb
@@ -0,0 +1,71 @@
+# frozen_string_literal: true
+
+module Gitlab
+ module Database
+ module LoadBalancing
+ class SidekiqServerMiddleware
+ JobReplicaNotUpToDate = Class.new(StandardError)
+
+ def call(worker, job, _queue)
+ if requires_primary?(worker.class, job)
+ Session.current.use_primary!
+ end
+
+ yield
+ ensure
+ clear
+ end
+
+ private
+
+ def clear
+ load_balancer.release_host
+ Session.clear_session
+ end
+
+ def requires_primary?(worker_class, job)
+ return true unless worker_class.include?(::ApplicationWorker)
+ return true unless worker_class.utilizes_load_balancing_capabilities?
+ return true unless worker_class.get_data_consistency_feature_flag_enabled?
+
+ location = job['database_write_location'] || job['database_replica_location']
+
+ return true unless location
+
+ job_data_consistency = worker_class.get_data_consistency
+ job[:data_consistency] = job_data_consistency.to_s
+
+ if replica_caught_up?(location)
+ job[:database_chosen] = 'replica'
+ false
+ elsif job_data_consistency == :delayed && not_yet_retried?(job)
+ job[:database_chosen] = 'retry'
+ raise JobReplicaNotUpToDate, "Sidekiq job #{worker_class} JID-#{job['jid']} couldn't use the replica."\
+ " Replica was not up to date."
+ else
+ job[:database_chosen] = 'primary'
+ true
+ end
+ end
+
+ def not_yet_retried?(job)
+ # if `retry_count` is `nil` it indicates that this job was never retried
+ # the `0` indicates that this is a first retry
+ job['retry_count'].nil?
+ end
+
+ def load_balancer
+ LoadBalancing.proxy.load_balancer
+ end
+
+ def replica_caught_up?(location)
+ if Feature.enabled?(:sidekiq_load_balancing_rotate_up_to_date_replica)
+ load_balancer.select_up_to_date_host(location)
+ else
+ load_balancer.host.caught_up?(location)
+ end
+ end
+ end
+ end
+ end
+end
diff --git a/lib/gitlab/database/load_balancing/srv_resolver.rb b/lib/gitlab/database/load_balancing/srv_resolver.rb
new file mode 100644
index 00000000000..20da525f4d2
--- /dev/null
+++ b/lib/gitlab/database/load_balancing/srv_resolver.rb
@@ -0,0 +1,46 @@
+# frozen_string_literal: true
+
+module Gitlab
+ module Database
+ module LoadBalancing
+ # Hostnames returned in SRV records cannot sometimes be resolved by a local
+ # resolver, however, there's a possibility that their A/AAAA records are
+ # returned as part of the SRV query in the additional section, so we try
+ # to extract the IPs from there first, failing back to querying the
+ # hostnames A/AAAA records one by one, using the same resolver that
+ # queried the SRV record.
+ class SrvResolver
+ include Gitlab::Utils::StrongMemoize
+
+ attr_reader :resolver, :additional
+
+ def initialize(resolver, additional)
+ @resolver = resolver
+ @additional = additional
+ end
+
+ def address_for(host)
+ addresses_from_additional[host] || resolve_host(host)
+ end
+
+ private
+
+ def addresses_from_additional
+ strong_memoize(:addresses_from_additional) do
+ additional.each_with_object({}) do |rr, h|
+ h[rr.name] = rr.address if rr.is_a?(Net::DNS::RR::A) || rr.is_a?(Net::DNS::RR::AAAA)
+ end
+ end
+ end
+
+ def resolve_host(host)
+ record = resolver.search(host, Net::DNS::ANY).answer.find do |rr|
+ rr.is_a?(Net::DNS::RR::A) || rr.is_a?(Net::DNS::RR::AAAA)
+ end
+
+ record&.address
+ end
+ end
+ end
+ end
+end
diff --git a/lib/gitlab/database/load_balancing/sticking.rb b/lib/gitlab/database/load_balancing/sticking.rb
new file mode 100644
index 00000000000..efbd7099300
--- /dev/null
+++ b/lib/gitlab/database/load_balancing/sticking.rb
@@ -0,0 +1,147 @@
+# frozen_string_literal: true
+
+module Gitlab
+ module Database
+ module LoadBalancing
+ # Module used for handling sticking connections to a primary, if
+ # necessary.
+ #
+ # ## Examples
+ #
+ # Sticking a user to the primary:
+ #
+ # Sticking.stick_if_necessary(:user, current_user.id)
+ #
+ # To unstick if possible, or continue using the primary otherwise:
+ #
+ # Sticking.unstick_or_continue_sticking(:user, current_user.id)
+ module Sticking
+ # The number of seconds after which a session should stop reading from
+ # the primary.
+ EXPIRATION = 30
+
+ # Sticks to the primary if a write was performed.
+ def self.stick_if_necessary(namespace, id)
+ return unless LoadBalancing.enable?
+
+ stick(namespace, id) if Session.current.performed_write?
+ end
+
+ # Checks if we are caught-up with all the work
+ def self.all_caught_up?(namespace, id)
+ location = last_write_location_for(namespace, id)
+
+ return true unless location
+
+ load_balancer.all_caught_up?(location).tap do |caught_up|
+ unstick(namespace, id) if caught_up
+ end
+ end
+
+ # Selects hosts that have caught up with the primary. This ensures
+ # atomic selection of the host to prevent the host list changing
+ # in another thread.
+ #
+ # Returns true if one host was selected.
+ def self.select_caught_up_replicas(namespace, id)
+ location = last_write_location_for(namespace, id)
+
+ # Unlike all_caught_up?, we return false if no write location exists.
+ # We want to be sure we talk to a replica that has caught up for a specific
+ # write location. If no such location exists, err on the side of caution.
+ return false unless location
+
+ load_balancer.select_caught_up_hosts(location).tap do |selected|
+ unstick(namespace, id) if selected
+ end
+ end
+
+ # Sticks to the primary if necessary, otherwise unsticks an object (if
+ # it was previously stuck to the primary).
+ def self.unstick_or_continue_sticking(namespace, id)
+ Session.current.use_primary! unless all_caught_up?(namespace, id)
+ end
+
+ # Select a replica that has caught up with the primary. If one has not been
+ # found, stick to the primary.
+ def self.select_valid_host(namespace, id)
+ replica_selected = select_caught_up_replicas(namespace, id)
+
+ Session.current.use_primary! unless replica_selected
+ end
+
+ # Starts sticking to the primary for the given namespace and id, using
+ # the latest WAL pointer from the primary.
+ def self.stick(namespace, id)
+ return unless LoadBalancing.enable?
+
+ mark_primary_write_location(namespace, id)
+ Session.current.use_primary!
+ end
+
+ def self.bulk_stick(namespace, ids)
+ return unless LoadBalancing.enable?
+
+ with_primary_write_location do |location|
+ ids.each do |id|
+ set_write_location_for(namespace, id, location)
+ end
+ end
+
+ Session.current.use_primary!
+ end
+
+ def self.with_primary_write_location
+ return unless LoadBalancing.configured?
+
+ # Load balancing could be enabled for the Web application server,
+ # but it's not activated for Sidekiq. We should update Redis with
+ # the write location just in case load balancing is being used.
+ location =
+ if LoadBalancing.enable?
+ load_balancer.primary_write_location
+ else
+ Gitlab::Database.get_write_location(ActiveRecord::Base.connection)
+ end
+
+ return if location.blank?
+
+ yield(location)
+ end
+
+ def self.mark_primary_write_location(namespace, id)
+ with_primary_write_location do |location|
+ set_write_location_for(namespace, id, location)
+ end
+ end
+
+ # Stops sticking to the primary.
+ def self.unstick(namespace, id)
+ Gitlab::Redis::SharedState.with do |redis|
+ redis.del(redis_key_for(namespace, id))
+ end
+ end
+
+ def self.set_write_location_for(namespace, id, location)
+ Gitlab::Redis::SharedState.with do |redis|
+ redis.set(redis_key_for(namespace, id), location, ex: EXPIRATION)
+ end
+ end
+
+ def self.last_write_location_for(namespace, id)
+ Gitlab::Redis::SharedState.with do |redis|
+ redis.get(redis_key_for(namespace, id))
+ end
+ end
+
+ def self.redis_key_for(namespace, id)
+ "database-load-balancing/write-location/#{namespace}/#{id}"
+ end
+
+ def self.load_balancer
+ LoadBalancing.proxy.load_balancer
+ end
+ end
+ end
+ end
+end