diff options
Diffstat (limited to 'lib/gitlab/database/load_balancing/load_balancer.rb')
-rw-r--r-- | lib/gitlab/database/load_balancing/load_balancer.rb | 275 |
1 files changed, 275 insertions, 0 deletions
diff --git a/lib/gitlab/database/load_balancing/load_balancer.rb b/lib/gitlab/database/load_balancing/load_balancer.rb new file mode 100644 index 00000000000..a833bb8491f --- /dev/null +++ b/lib/gitlab/database/load_balancing/load_balancer.rb @@ -0,0 +1,275 @@ +# frozen_string_literal: true + +module Gitlab + module Database + module LoadBalancing + # Load balancing for ActiveRecord connections. + # + # Each host in the load balancer uses the same credentials as the primary + # database. + # + # This class *requires* that `ActiveRecord::Base.retrieve_connection` + # always returns a connection to the primary. + class LoadBalancer + CACHE_KEY = :gitlab_load_balancer_host + VALID_HOSTS_CACHE_KEY = :gitlab_load_balancer_valid_hosts + + attr_reader :host_list + + # hosts - The hostnames/addresses of the additional databases. + def initialize(hosts = []) + @host_list = HostList.new(hosts.map { |addr| Host.new(addr, self) }) + @connection_db_roles = {}.compare_by_identity + @connection_db_roles_count = {}.compare_by_identity + end + + # Yields a connection that can be used for reads. + # + # If no secondaries were available this method will use the primary + # instead. + def read(&block) + connection = nil + conflict_retried = 0 + + while host + ensure_caching! + + begin + connection = host.connection + track_connection_role(connection, ROLE_REPLICA) + + return yield connection + rescue StandardError => error + untrack_connection_role(connection) + + if serialization_failure?(error) + # This error can occur when a query conflicts. See + # https://www.postgresql.org/docs/current/static/hot-standby.html#HOT-STANDBY-CONFLICT + # for more information. + # + # In this event we'll cycle through the secondaries at most 3 + # times before using the primary instead. + will_retry = conflict_retried < @host_list.length * 3 + + LoadBalancing::Logger.warn( + event: :host_query_conflict, + message: 'Query conflict on host', + conflict_retried: conflict_retried, + will_retry: will_retry, + db_host: host.host, + db_port: host.port, + host_list_length: @host_list.length + ) + + if will_retry + conflict_retried += 1 + release_host + else + break + end + elsif connection_error?(error) + host.offline! + release_host + else + raise error + end + end + end + + LoadBalancing::Logger.warn( + event: :no_secondaries_available, + message: 'No secondaries were available, using primary instead', + conflict_retried: conflict_retried, + host_list_length: @host_list.length + ) + + read_write(&block) + ensure + untrack_connection_role(connection) + end + + # Yields a connection that can be used for both reads and writes. + def read_write + connection = nil + # In the event of a failover the primary may be briefly unavailable. + # Instead of immediately grinding to a halt we'll retry the operation + # a few times. + retry_with_backoff do + connection = ActiveRecord::Base.retrieve_connection + track_connection_role(connection, ROLE_PRIMARY) + + yield connection + end + ensure + untrack_connection_role(connection) + end + + # Recognize the role (primary/replica) of the database this connection + # is connecting to. If the connection is not issued by this load + # balancer, return nil + def db_role_for_connection(connection) + return @connection_db_roles[connection] if @connection_db_roles[connection] + return ROLE_REPLICA if @host_list.manage_pool?(connection.pool) + return ROLE_PRIMARY if connection.pool == ActiveRecord::Base.connection_pool + end + + # Returns a host to use for queries. + # + # Hosts are scoped per thread so that multiple threads don't + # accidentally re-use the same host + connection. + def host + RequestStore[CACHE_KEY] ||= current_host_list.next + end + + # Releases the host and connection for the current thread. + def release_host + if host = RequestStore[CACHE_KEY] + host.disable_query_cache! + host.release_connection + end + + RequestStore.delete(CACHE_KEY) + RequestStore.delete(VALID_HOSTS_CACHE_KEY) + end + + def release_primary_connection + ActiveRecord::Base.connection_pool.release_connection + end + + # Returns the transaction write location of the primary. + def primary_write_location + location = read_write do |connection| + ::Gitlab::Database.get_write_location(connection) + end + + return location if location + + raise 'Failed to determine the write location of the primary database' + end + + # Returns true if all hosts have caught up to the given transaction + # write location. + def all_caught_up?(location) + @host_list.hosts.all? { |host| host.caught_up?(location) } + end + + # Returns true if there was at least one host that has caught up with the given transaction. + # + # In case of a retry, this method also stores the set of hosts that have caught up. + def select_caught_up_hosts(location) + all_hosts = @host_list.hosts + valid_hosts = all_hosts.select { |host| host.caught_up?(location) } + + return false if valid_hosts.empty? + + # Hosts can come online after the time when this scan was done, + # so we need to remember the ones that can be used. If the host went + # offline, we'll just rely on the retry mechanism to use the primary. + set_consistent_hosts_for_request(HostList.new(valid_hosts)) + + # Since we will be using a subset from the original list, let's just + # pick a random host and mix up the original list to ensure we don't + # only end up using one replica. + RequestStore[CACHE_KEY] = valid_hosts.sample + @host_list.shuffle + + true + end + + # Returns true if there was at least one host that has caught up with the given transaction. + # Similar to `#select_caught_up_hosts`, picks a random host, to rotate replicas we use. + # Unlike `#select_caught_up_hosts`, does not iterate over all hosts if finds any. + def select_up_to_date_host(location) + all_hosts = @host_list.hosts.shuffle + host = all_hosts.find { |host| host.caught_up?(location) } + + return false unless host + + RequestStore[CACHE_KEY] = host + + true + end + + def set_consistent_hosts_for_request(hosts) + RequestStore[VALID_HOSTS_CACHE_KEY] = hosts + end + + # Yields a block, retrying it upon error using an exponential backoff. + def retry_with_backoff(retries = 3, time = 2) + retried = 0 + last_error = nil + + while retried < retries + begin + return yield + rescue StandardError => error + raise error unless connection_error?(error) + + # We need to release the primary connection as otherwise Rails + # will keep raising errors when using the connection. + release_primary_connection + + last_error = error + sleep(time) + retried += 1 + time **= 2 + end + end + + raise last_error + end + + def connection_error?(error) + case error + when ActiveRecord::StatementInvalid, ActionView::Template::Error + # After connecting to the DB Rails will wrap query errors using this + # class. + connection_error?(error.cause) + when *CONNECTION_ERRORS + true + else + # When PG tries to set the client encoding but fails due to a + # connection error it will raise a PG::Error instance. Catching that + # would catch all errors (even those we don't want), so instead we + # check for the message of the error. + error.message.start_with?('invalid encoding name:') + end + end + + def serialization_failure?(error) + if error.cause + serialization_failure?(error.cause) + else + error.is_a?(PG::TRSerializationFailure) + end + end + + private + + def ensure_caching! + host.enable_query_cache! unless host.query_cache_enabled + end + + def track_connection_role(connection, role) + @connection_db_roles[connection] = role + @connection_db_roles_count[connection] ||= 0 + @connection_db_roles_count[connection] += 1 + end + + def untrack_connection_role(connection) + return if connection.blank? || @connection_db_roles_count[connection].blank? + + @connection_db_roles_count[connection] -= 1 + if @connection_db_roles_count[connection] <= 0 + @connection_db_roles.delete(connection) + @connection_db_roles_count.delete(connection) + end + end + + def current_host_list + RequestStore[VALID_HOSTS_CACHE_KEY] || @host_list + end + end + end + end +end |