diff options
Diffstat (limited to 'lib/gitlab/database/load_balancing')
-rw-r--r-- | lib/gitlab/database/load_balancing/active_record_proxy.rb | 15 | ||||
-rw-r--r-- | lib/gitlab/database/load_balancing/connection_proxy.rb | 140 | ||||
-rw-r--r-- | lib/gitlab/database/load_balancing/host.rb | 209 | ||||
-rw-r--r-- | lib/gitlab/database/load_balancing/host_list.rb | 99 | ||||
-rw-r--r-- | lib/gitlab/database/load_balancing/load_balancer.rb | 275 | ||||
-rw-r--r-- | lib/gitlab/database/load_balancing/logger.rb | 13 | ||||
-rw-r--r-- | lib/gitlab/database/load_balancing/rack_middleware.rb | 98 | ||||
-rw-r--r-- | lib/gitlab/database/load_balancing/resolver.rb | 52 | ||||
-rw-r--r-- | lib/gitlab/database/load_balancing/service_discovery.rb | 187 | ||||
-rw-r--r-- | lib/gitlab/database/load_balancing/session.rb | 118 | ||||
-rw-r--r-- | lib/gitlab/database/load_balancing/sidekiq_client_middleware.rb | 46 | ||||
-rw-r--r-- | lib/gitlab/database/load_balancing/sidekiq_server_middleware.rb | 71 | ||||
-rw-r--r-- | lib/gitlab/database/load_balancing/srv_resolver.rb | 46 | ||||
-rw-r--r-- | lib/gitlab/database/load_balancing/sticking.rb | 147 |
14 files changed, 1516 insertions, 0 deletions
diff --git a/lib/gitlab/database/load_balancing/active_record_proxy.rb b/lib/gitlab/database/load_balancing/active_record_proxy.rb new file mode 100644 index 00000000000..7763497e770 --- /dev/null +++ b/lib/gitlab/database/load_balancing/active_record_proxy.rb @@ -0,0 +1,15 @@ +# frozen_string_literal: true + +module Gitlab + module Database + module LoadBalancing + # Module injected into ActiveRecord::Base to allow hijacking of the + # "connection" method. + module ActiveRecordProxy + def connection + LoadBalancing.proxy + end + end + end + end +end diff --git a/lib/gitlab/database/load_balancing/connection_proxy.rb b/lib/gitlab/database/load_balancing/connection_proxy.rb new file mode 100644 index 00000000000..3a09689a724 --- /dev/null +++ b/lib/gitlab/database/load_balancing/connection_proxy.rb @@ -0,0 +1,140 @@ +# frozen_string_literal: true + +# rubocop:disable GitlabSecurity/PublicSend + +module Gitlab + module Database + module LoadBalancing + # Redirecting of ActiveRecord connections. + # + # The ConnectionProxy class redirects ActiveRecord connection requests to + # the right load balancer pool, depending on the type of query. + class ConnectionProxy + WriteInsideReadOnlyTransactionError = Class.new(StandardError) + READ_ONLY_TRANSACTION_KEY = :load_balacing_read_only_transaction + + attr_reader :load_balancer + + # These methods perform writes after which we need to stick to the + # primary. + STICKY_WRITES = %i( + delete + delete_all + insert + update + update_all + ).freeze + + NON_STICKY_READS = %i( + sanitize_limit + select + select_one + select_rows + quote_column_name + ).freeze + + # hosts - The hosts to use for load balancing. + def initialize(hosts = []) + @load_balancer = LoadBalancer.new(hosts) + end + + def select_all(arel, name = nil, binds = [], preparable: nil) + if arel.respond_to?(:locked) && arel.locked + # SELECT ... FOR UPDATE queries should be sent to the primary. + write_using_load_balancer(:select_all, [arel, name, binds], + sticky: true) + else + read_using_load_balancer(:select_all, [arel, name, binds]) + end + end + + NON_STICKY_READS.each do |name| + define_method(name) do |*args, &block| + read_using_load_balancer(name, args, &block) + end + end + + STICKY_WRITES.each do |name| + define_method(name) do |*args, &block| + write_using_load_balancer(name, args, sticky: true, &block) + end + end + + def transaction(*args, &block) + if current_session.fallback_to_replicas_for_ambiguous_queries? + track_read_only_transaction! + read_using_load_balancer(:transaction, args, &block) + else + write_using_load_balancer(:transaction, args, sticky: true, &block) + end + + ensure + untrack_read_only_transaction! + end + + # Delegates all unknown messages to a read-write connection. + def method_missing(name, *args, &block) + if current_session.fallback_to_replicas_for_ambiguous_queries? + read_using_load_balancer(name, args, &block) + else + write_using_load_balancer(name, args, &block) + end + end + + # Performs a read using the load balancer. + # + # name - The name of the method to call on a connection object. + def read_using_load_balancer(name, args, &block) + if current_session.use_primary? && + !current_session.use_replicas_for_read_queries? + @load_balancer.read_write do |connection| + connection.send(name, *args, &block) + end + else + @load_balancer.read do |connection| + connection.send(name, *args, &block) + end + end + end + + # Performs a write using the load balancer. + # + # name - The name of the method to call on a connection object. + # sticky - If set to true the session will stick to the master after + # the write. + def write_using_load_balancer(name, args, sticky: false, &block) + if read_only_transaction? + raise WriteInsideReadOnlyTransactionError, 'A write query is performed inside a read-only transaction' + end + + @load_balancer.read_write do |connection| + # Sticking has to be enabled before calling the method. Not doing so + # could lead to methods called in a block still being performed on a + # secondary instead of on a primary (when necessary). + current_session.write! if sticky + + connection.send(name, *args, &block) + end + end + + private + + def current_session + ::Gitlab::Database::LoadBalancing::Session.current + end + + def track_read_only_transaction! + Thread.current[READ_ONLY_TRANSACTION_KEY] = true + end + + def untrack_read_only_transaction! + Thread.current[READ_ONLY_TRANSACTION_KEY] = nil + end + + def read_only_transaction? + Thread.current[READ_ONLY_TRANSACTION_KEY] == true + end + end + end + end +end diff --git a/lib/gitlab/database/load_balancing/host.rb b/lib/gitlab/database/load_balancing/host.rb new file mode 100644 index 00000000000..3e74b5ea727 --- /dev/null +++ b/lib/gitlab/database/load_balancing/host.rb @@ -0,0 +1,209 @@ +# frozen_string_literal: true + +module Gitlab + module Database + module LoadBalancing + # A single database host used for load balancing. + class Host + attr_reader :pool, :last_checked_at, :intervals, :load_balancer, :host, :port + + delegate :connection, :release_connection, :enable_query_cache!, :disable_query_cache!, :query_cache_enabled, to: :pool + + CONNECTION_ERRORS = + if defined?(PG) + [ + ActionView::Template::Error, + ActiveRecord::StatementInvalid, + PG::Error + ].freeze + else + [ + ActionView::Template::Error, + ActiveRecord::StatementInvalid + ].freeze + end + + # host - The address of the database. + # load_balancer - The LoadBalancer that manages this Host. + def initialize(host, load_balancer, port: nil) + @host = host + @port = port + @load_balancer = load_balancer + @pool = Database.create_connection_pool(LoadBalancing.pool_size, host, port) + @online = true + @last_checked_at = Time.zone.now + + interval = LoadBalancing.replica_check_interval + @intervals = (interval..(interval * 2)).step(0.5).to_a + end + + # Disconnects the pool, once all connections are no longer in use. + # + # timeout - The time after which the pool should be forcefully + # disconnected. + def disconnect!(timeout = 120) + start_time = Metrics::System.monotonic_time + + while (Metrics::System.monotonic_time - start_time) <= timeout + break if pool.connections.none?(&:in_use?) + + sleep(2) + end + + pool.disconnect! + end + + def offline! + LoadBalancing::Logger.warn( + event: :host_offline, + message: 'Marking host as offline', + db_host: @host, + db_port: @port + ) + + @online = false + @pool.disconnect! + end + + # Returns true if the host is online. + def online? + return @online unless check_replica_status? + + refresh_status + + if @online + LoadBalancing::Logger.info( + event: :host_online, + message: 'Host is online after replica status check', + db_host: @host, + db_port: @port + ) + else + LoadBalancing::Logger.warn( + event: :host_offline, + message: 'Host is offline after replica status check', + db_host: @host, + db_port: @port + ) + end + + @online + rescue *CONNECTION_ERRORS + offline! + false + end + + def refresh_status + @online = replica_is_up_to_date? + @last_checked_at = Time.zone.now + end + + def check_replica_status? + (Time.zone.now - last_checked_at) >= intervals.sample + end + + def replica_is_up_to_date? + replication_lag_below_threshold? || data_is_recent_enough? + end + + def replication_lag_below_threshold? + if (lag_time = replication_lag_time) + lag_time <= LoadBalancing.max_replication_lag_time + else + false + end + end + + # Returns true if the replica has replicated enough data to be useful. + def data_is_recent_enough? + # It's possible for a replica to not replay WAL data for a while, + # despite being up to date. This can happen when a primary does not + # receive any writes for a while. + # + # To prevent this from happening we check if the lag size (in bytes) + # of the replica is small enough for the replica to be useful. We + # only do this if we haven't replicated in a while so we only need + # to connect to the primary when truly necessary. + if (lag_size = replication_lag_size) + lag_size <= LoadBalancing.max_replication_difference + else + false + end + end + + # Returns the replication lag time of this secondary in seconds as a + # float. + # + # This method will return nil if no lag time could be calculated. + def replication_lag_time + row = query_and_release('SELECT EXTRACT(EPOCH FROM (now() - pg_last_xact_replay_timestamp()))::float as lag') + + row['lag'].to_f if row.any? + end + + # Returns the number of bytes this secondary is lagging behind the + # primary. + # + # This method will return nil if no lag size could be calculated. + def replication_lag_size + location = connection.quote(primary_write_location) + row = query_and_release(<<-SQL.squish) + SELECT pg_wal_lsn_diff(#{location}, pg_last_wal_replay_lsn())::float + AS diff + SQL + + row['diff'].to_i if row.any? + rescue *CONNECTION_ERRORS + nil + end + + def primary_write_location + load_balancer.primary_write_location + ensure + load_balancer.release_primary_connection + end + + def database_replica_location + row = query_and_release(<<-SQL.squish) + SELECT pg_last_wal_replay_lsn()::text AS location + SQL + + row['location'] if row.any? + rescue *CONNECTION_ERRORS + nil + end + + # Returns true if this host has caught up to the given transaction + # write location. + # + # location - The transaction write location as reported by a primary. + def caught_up?(location) + string = connection.quote(location) + + # In case the host is a primary pg_last_wal_replay_lsn/pg_last_xlog_replay_location() returns + # NULL. The recovery check ensures we treat the host as up-to-date in + # such a case. + query = <<-SQL.squish + SELECT NOT pg_is_in_recovery() + OR pg_wal_lsn_diff(pg_last_wal_replay_lsn(), #{string}) >= 0 + AS result + SQL + + row = query_and_release(query) + + ::Gitlab::Utils.to_boolean(row['result']) + rescue *CONNECTION_ERRORS + false + end + + def query_and_release(sql) + connection.select_all(sql).first || {} + rescue StandardError + {} + ensure + release_connection + end + end + end + end +end diff --git a/lib/gitlab/database/load_balancing/host_list.rb b/lib/gitlab/database/load_balancing/host_list.rb new file mode 100644 index 00000000000..24800012947 --- /dev/null +++ b/lib/gitlab/database/load_balancing/host_list.rb @@ -0,0 +1,99 @@ +# frozen_string_literal: true + +module Gitlab + module Database + module LoadBalancing + # A list of database hosts to use for connections. + class HostList + # hosts - The list of secondary hosts to add. + def initialize(hosts = []) + @hosts = hosts.shuffle + @pools = Set.new + @index = 0 + @mutex = Mutex.new + @hosts_gauge = Gitlab::Metrics.gauge(:db_load_balancing_hosts, 'Current number of load balancing hosts') + + set_metrics! + update_pools + end + + def hosts + @mutex.synchronize { @hosts.dup } + end + + def shuffle + @mutex.synchronize do + unsafe_shuffle + end + end + + def length + @mutex.synchronize { @hosts.length } + end + + def host_names_and_ports + @mutex.synchronize { @hosts.map { |host| [host.host, host.port] } } + end + + def manage_pool?(pool) + @pools.include?(pool) + end + + def hosts=(hosts) + @mutex.synchronize do + @hosts = hosts + unsafe_shuffle + update_pools + end + + set_metrics! + end + + # Sets metrics before returning next host + def next + next_host.tap do |_| + set_metrics! + end + end + + private + + def unsafe_shuffle + @hosts = @hosts.shuffle + @index = 0 + end + + # Returns the next available host. + # + # Returns a Gitlab::Database::LoadBalancing::Host instance, or nil if no + # hosts were available. + def next_host + @mutex.synchronize do + break if @hosts.empty? + + started_at = @index + + loop do + host = @hosts[@index] + @index = (@index + 1) % @hosts.length + + break host if host.online? + + # Return nil once we have cycled through all hosts and none were + # available. + break if @index == started_at + end + end + end + + def set_metrics! + @hosts_gauge.set({}, @hosts.length) + end + + def update_pools + @pools = Set.new(@hosts.map(&:pool)) + end + end + end + end +end diff --git a/lib/gitlab/database/load_balancing/load_balancer.rb b/lib/gitlab/database/load_balancing/load_balancer.rb new file mode 100644 index 00000000000..a833bb8491f --- /dev/null +++ b/lib/gitlab/database/load_balancing/load_balancer.rb @@ -0,0 +1,275 @@ +# frozen_string_literal: true + +module Gitlab + module Database + module LoadBalancing + # Load balancing for ActiveRecord connections. + # + # Each host in the load balancer uses the same credentials as the primary + # database. + # + # This class *requires* that `ActiveRecord::Base.retrieve_connection` + # always returns a connection to the primary. + class LoadBalancer + CACHE_KEY = :gitlab_load_balancer_host + VALID_HOSTS_CACHE_KEY = :gitlab_load_balancer_valid_hosts + + attr_reader :host_list + + # hosts - The hostnames/addresses of the additional databases. + def initialize(hosts = []) + @host_list = HostList.new(hosts.map { |addr| Host.new(addr, self) }) + @connection_db_roles = {}.compare_by_identity + @connection_db_roles_count = {}.compare_by_identity + end + + # Yields a connection that can be used for reads. + # + # If no secondaries were available this method will use the primary + # instead. + def read(&block) + connection = nil + conflict_retried = 0 + + while host + ensure_caching! + + begin + connection = host.connection + track_connection_role(connection, ROLE_REPLICA) + + return yield connection + rescue StandardError => error + untrack_connection_role(connection) + + if serialization_failure?(error) + # This error can occur when a query conflicts. See + # https://www.postgresql.org/docs/current/static/hot-standby.html#HOT-STANDBY-CONFLICT + # for more information. + # + # In this event we'll cycle through the secondaries at most 3 + # times before using the primary instead. + will_retry = conflict_retried < @host_list.length * 3 + + LoadBalancing::Logger.warn( + event: :host_query_conflict, + message: 'Query conflict on host', + conflict_retried: conflict_retried, + will_retry: will_retry, + db_host: host.host, + db_port: host.port, + host_list_length: @host_list.length + ) + + if will_retry + conflict_retried += 1 + release_host + else + break + end + elsif connection_error?(error) + host.offline! + release_host + else + raise error + end + end + end + + LoadBalancing::Logger.warn( + event: :no_secondaries_available, + message: 'No secondaries were available, using primary instead', + conflict_retried: conflict_retried, + host_list_length: @host_list.length + ) + + read_write(&block) + ensure + untrack_connection_role(connection) + end + + # Yields a connection that can be used for both reads and writes. + def read_write + connection = nil + # In the event of a failover the primary may be briefly unavailable. + # Instead of immediately grinding to a halt we'll retry the operation + # a few times. + retry_with_backoff do + connection = ActiveRecord::Base.retrieve_connection + track_connection_role(connection, ROLE_PRIMARY) + + yield connection + end + ensure + untrack_connection_role(connection) + end + + # Recognize the role (primary/replica) of the database this connection + # is connecting to. If the connection is not issued by this load + # balancer, return nil + def db_role_for_connection(connection) + return @connection_db_roles[connection] if @connection_db_roles[connection] + return ROLE_REPLICA if @host_list.manage_pool?(connection.pool) + return ROLE_PRIMARY if connection.pool == ActiveRecord::Base.connection_pool + end + + # Returns a host to use for queries. + # + # Hosts are scoped per thread so that multiple threads don't + # accidentally re-use the same host + connection. + def host + RequestStore[CACHE_KEY] ||= current_host_list.next + end + + # Releases the host and connection for the current thread. + def release_host + if host = RequestStore[CACHE_KEY] + host.disable_query_cache! + host.release_connection + end + + RequestStore.delete(CACHE_KEY) + RequestStore.delete(VALID_HOSTS_CACHE_KEY) + end + + def release_primary_connection + ActiveRecord::Base.connection_pool.release_connection + end + + # Returns the transaction write location of the primary. + def primary_write_location + location = read_write do |connection| + ::Gitlab::Database.get_write_location(connection) + end + + return location if location + + raise 'Failed to determine the write location of the primary database' + end + + # Returns true if all hosts have caught up to the given transaction + # write location. + def all_caught_up?(location) + @host_list.hosts.all? { |host| host.caught_up?(location) } + end + + # Returns true if there was at least one host that has caught up with the given transaction. + # + # In case of a retry, this method also stores the set of hosts that have caught up. + def select_caught_up_hosts(location) + all_hosts = @host_list.hosts + valid_hosts = all_hosts.select { |host| host.caught_up?(location) } + + return false if valid_hosts.empty? + + # Hosts can come online after the time when this scan was done, + # so we need to remember the ones that can be used. If the host went + # offline, we'll just rely on the retry mechanism to use the primary. + set_consistent_hosts_for_request(HostList.new(valid_hosts)) + + # Since we will be using a subset from the original list, let's just + # pick a random host and mix up the original list to ensure we don't + # only end up using one replica. + RequestStore[CACHE_KEY] = valid_hosts.sample + @host_list.shuffle + + true + end + + # Returns true if there was at least one host that has caught up with the given transaction. + # Similar to `#select_caught_up_hosts`, picks a random host, to rotate replicas we use. + # Unlike `#select_caught_up_hosts`, does not iterate over all hosts if finds any. + def select_up_to_date_host(location) + all_hosts = @host_list.hosts.shuffle + host = all_hosts.find { |host| host.caught_up?(location) } + + return false unless host + + RequestStore[CACHE_KEY] = host + + true + end + + def set_consistent_hosts_for_request(hosts) + RequestStore[VALID_HOSTS_CACHE_KEY] = hosts + end + + # Yields a block, retrying it upon error using an exponential backoff. + def retry_with_backoff(retries = 3, time = 2) + retried = 0 + last_error = nil + + while retried < retries + begin + return yield + rescue StandardError => error + raise error unless connection_error?(error) + + # We need to release the primary connection as otherwise Rails + # will keep raising errors when using the connection. + release_primary_connection + + last_error = error + sleep(time) + retried += 1 + time **= 2 + end + end + + raise last_error + end + + def connection_error?(error) + case error + when ActiveRecord::StatementInvalid, ActionView::Template::Error + # After connecting to the DB Rails will wrap query errors using this + # class. + connection_error?(error.cause) + when *CONNECTION_ERRORS + true + else + # When PG tries to set the client encoding but fails due to a + # connection error it will raise a PG::Error instance. Catching that + # would catch all errors (even those we don't want), so instead we + # check for the message of the error. + error.message.start_with?('invalid encoding name:') + end + end + + def serialization_failure?(error) + if error.cause + serialization_failure?(error.cause) + else + error.is_a?(PG::TRSerializationFailure) + end + end + + private + + def ensure_caching! + host.enable_query_cache! unless host.query_cache_enabled + end + + def track_connection_role(connection, role) + @connection_db_roles[connection] = role + @connection_db_roles_count[connection] ||= 0 + @connection_db_roles_count[connection] += 1 + end + + def untrack_connection_role(connection) + return if connection.blank? || @connection_db_roles_count[connection].blank? + + @connection_db_roles_count[connection] -= 1 + if @connection_db_roles_count[connection] <= 0 + @connection_db_roles.delete(connection) + @connection_db_roles_count.delete(connection) + end + end + + def current_host_list + RequestStore[VALID_HOSTS_CACHE_KEY] || @host_list + end + end + end + end +end diff --git a/lib/gitlab/database/load_balancing/logger.rb b/lib/gitlab/database/load_balancing/logger.rb new file mode 100644 index 00000000000..ee67ffcc99c --- /dev/null +++ b/lib/gitlab/database/load_balancing/logger.rb @@ -0,0 +1,13 @@ +# frozen_string_literal: true + +module Gitlab + module Database + module LoadBalancing + class Logger < ::Gitlab::JsonLogger + def self.file_name_noext + 'database_load_balancing' + end + end + end + end +end diff --git a/lib/gitlab/database/load_balancing/rack_middleware.rb b/lib/gitlab/database/load_balancing/rack_middleware.rb new file mode 100644 index 00000000000..4734ff99bd3 --- /dev/null +++ b/lib/gitlab/database/load_balancing/rack_middleware.rb @@ -0,0 +1,98 @@ +# frozen_string_literal: true + +module Gitlab + module Database + module LoadBalancing + # Rack middleware to handle sticking when serving Rails requests. Grape + # API calls are handled separately as different API endpoints need to + # stick based on different objects. + class RackMiddleware + STICK_OBJECT = 'load_balancing.stick_object' + + # Unsticks or continues sticking the current request. + # + # This method also updates the Rack environment so #call can later + # determine if we still need to stick or not. + # + # env - The Rack environment. + # namespace - The namespace to use for sticking. + # id - The identifier to use for sticking. + def self.stick_or_unstick(env, namespace, id) + return unless LoadBalancing.enable? + + Sticking.unstick_or_continue_sticking(namespace, id) + + env[STICK_OBJECT] ||= Set.new + env[STICK_OBJECT] << [namespace, id] + end + + def initialize(app) + @app = app + end + + def call(env) + # Ensure that any state that may have run before the first request + # doesn't linger around. + clear + + unstick_or_continue_sticking(env) + + result = @app.call(env) + + stick_if_necessary(env) + + result + ensure + clear + end + + # Determine if we need to stick based on currently available user data. + # + # Typically this code will only be reachable for Rails requests as + # Grape data is not yet available at this point. + def unstick_or_continue_sticking(env) + namespaces_and_ids = sticking_namespaces_and_ids(env) + + namespaces_and_ids.each do |namespace, id| + Sticking.unstick_or_continue_sticking(namespace, id) + end + end + + # Determine if we need to stick after handling a request. + def stick_if_necessary(env) + namespaces_and_ids = sticking_namespaces_and_ids(env) + + namespaces_and_ids.each do |namespace, id| + Sticking.stick_if_necessary(namespace, id) + end + end + + def clear + load_balancer.release_host + Session.clear_session + end + + def load_balancer + LoadBalancing.proxy.load_balancer + end + + # Determines the sticking namespace and identifier based on the Rack + # environment. + # + # For Rails requests this uses warden, but Grape and others have to + # manually set the right environment variable. + def sticking_namespaces_and_ids(env) + warden = env['warden'] + + if warden && warden.user + [[:user, warden.user.id]] + elsif env[STICK_OBJECT].present? + env[STICK_OBJECT].to_a + else + [] + end + end + end + end + end +end diff --git a/lib/gitlab/database/load_balancing/resolver.rb b/lib/gitlab/database/load_balancing/resolver.rb new file mode 100644 index 00000000000..a291080cc3d --- /dev/null +++ b/lib/gitlab/database/load_balancing/resolver.rb @@ -0,0 +1,52 @@ +# frozen_string_literal: true + +require 'net/dns' +require 'resolv' + +module Gitlab + module Database + module LoadBalancing + class Resolver + UnresolvableNameserverError = Class.new(StandardError) + + def initialize(nameserver) + @nameserver = nameserver + end + + def resolve + address = ip_address || ip_address_from_hosts_file || + ip_address_from_dns + + unless address + raise UnresolvableNameserverError, + "could not resolve #{@nameserver}" + end + + address + end + + private + + def ip_address + IPAddr.new(@nameserver) + rescue IPAddr::InvalidAddressError + end + + def ip_address_from_hosts_file + ip = Resolv::Hosts.new.getaddress(@nameserver) + IPAddr.new(ip) + rescue Resolv::ResolvError + end + + def ip_address_from_dns + answer = Net::DNS::Resolver.start(@nameserver, Net::DNS::A).answer + return if answer.empty? + + answer.first.address + rescue Net::DNS::Resolver::NoResponseError + raise UnresolvableNameserverError, "no response from DNS server(s)" + end + end + end + end +end diff --git a/lib/gitlab/database/load_balancing/service_discovery.rb b/lib/gitlab/database/load_balancing/service_discovery.rb new file mode 100644 index 00000000000..9b42b25be1c --- /dev/null +++ b/lib/gitlab/database/load_balancing/service_discovery.rb @@ -0,0 +1,187 @@ +# frozen_string_literal: true + +require 'net/dns' +require 'resolv' + +module Gitlab + module Database + module LoadBalancing + # Service discovery of secondary database hosts. + # + # Service discovery works by periodically looking up a DNS record. If the + # DNS record returns a new list of hosts, this class will update the load + # balancer with said hosts. Requests may continue to use the old hosts + # until they complete. + class ServiceDiscovery + attr_reader :interval, :record, :record_type, :disconnect_timeout + + MAX_SLEEP_ADJUSTMENT = 10 + + RECORD_TYPES = { + 'A' => Net::DNS::A, + 'SRV' => Net::DNS::SRV + }.freeze + + Address = Struct.new(:hostname, :port) do + def to_s + port ? "#{hostname}:#{port}" : hostname + end + + def <=>(other) + self.to_s <=> other.to_s + end + end + + # nameserver - The nameserver to use for DNS lookups. + # port - The port of the nameserver. + # record - The DNS record to look up for retrieving the secondaries. + # record_type - The type of DNS record to look up + # interval - The time to wait between lookups. + # disconnect_timeout - The time after which an old host should be + # forcefully disconnected. + # use_tcp - Use TCP instaed of UDP to look up resources + def initialize(nameserver:, port:, record:, record_type: 'A', interval: 60, disconnect_timeout: 120, use_tcp: false) + @nameserver = nameserver + @port = port + @record = record + @record_type = record_type_for(record_type) + @interval = interval + @disconnect_timeout = disconnect_timeout + @use_tcp = use_tcp + end + + def start + Thread.new do + loop do + interval = + begin + refresh_if_necessary + rescue StandardError => error + # Any exceptions that might occur should be reported to + # Sentry, instead of silently terminating this thread. + Gitlab::ErrorTracking.track_exception(error) + + Gitlab::AppLogger.error( + "Service discovery encountered an error: #{error.message}" + ) + + self.interval + end + + # We slightly randomize the sleep() interval. This should reduce + # the likelihood of _all_ processes refreshing at the same time, + # possibly putting unnecessary pressure on the DNS server. + sleep(interval + rand(MAX_SLEEP_ADJUSTMENT)) + end + end + end + + # Refreshes the hosts, but only if the DNS record returned a new list of + # addresses. + # + # The return value is the amount of time (in seconds) to wait before + # checking the DNS record for any changes. + def refresh_if_necessary + interval, from_dns = addresses_from_dns + + current = addresses_from_load_balancer + + replace_hosts(from_dns) if from_dns != current + + interval + end + + # Replaces all the hosts in the load balancer with the new ones, + # disconnecting the old connections. + # + # addresses - An Array of Address structs to use for the new hosts. + def replace_hosts(addresses) + old_hosts = load_balancer.host_list.hosts + + load_balancer.host_list.hosts = addresses.map do |addr| + Host.new(addr.hostname, load_balancer, port: addr.port) + end + + # We must explicitly disconnect the old connections, otherwise we may + # leak database connections over time. For example, if a request + # started just before we added the new hosts it will use an old + # host/connection. While this connection will be checked in and out, + # it won't be explicitly disconnected. + old_hosts.each do |host| + host.disconnect!(disconnect_timeout) + end + end + + # Returns an Array containing: + # + # 1. The time to wait for the next check. + # 2. An array containing the hostnames of the DNS record. + def addresses_from_dns + response = resolver.search(record, record_type) + resources = response.answer + + addresses = + case record_type + when Net::DNS::A + addresses_from_a_record(resources) + when Net::DNS::SRV + addresses_from_srv_record(response) + end + + # Addresses are sorted so we can directly compare the old and new + # addresses, without having to use any additional data structures. + [new_wait_time_for(resources), addresses.sort] + end + + def new_wait_time_for(resources) + wait = resources.first&.ttl || interval + + # The preconfigured interval acts as a minimum amount of time to + # wait. + wait < interval ? interval : wait + end + + def addresses_from_load_balancer + load_balancer.host_list.host_names_and_ports.map do |hostname, port| + Address.new(hostname, port) + end.sort + end + + def load_balancer + LoadBalancing.proxy.load_balancer + end + + def resolver + @resolver ||= Net::DNS::Resolver.new( + nameservers: Resolver.new(@nameserver).resolve, + port: @port, + use_tcp: @use_tcp + ) + end + + private + + def record_type_for(type) + RECORD_TYPES.fetch(type) do + raise(ArgumentError, "Unsupported record type: #{type}") + end + end + + def addresses_from_srv_record(response) + srv_resolver = SrvResolver.new(resolver, response.additional) + + response.answer.map do |r| + address = srv_resolver.address_for(r.host.to_s) + next unless address + + Address.new(address.to_s, r.port) + end.compact + end + + def addresses_from_a_record(resources) + resources.map { |r| Address.new(r.address.to_s) } + end + end + end + end +end diff --git a/lib/gitlab/database/load_balancing/session.rb b/lib/gitlab/database/load_balancing/session.rb new file mode 100644 index 00000000000..3682c9265c2 --- /dev/null +++ b/lib/gitlab/database/load_balancing/session.rb @@ -0,0 +1,118 @@ +# frozen_string_literal: true + +module Gitlab + module Database + module LoadBalancing + # Tracking of load balancing state per user session. + # + # A session starts at the beginning of a request and ends once the request + # has been completed. Sessions can be used to keep track of what hosts + # should be used for queries. + class Session + CACHE_KEY = :gitlab_load_balancer_session + + def self.current + RequestStore[CACHE_KEY] ||= new + end + + def self.clear_session + RequestStore.delete(CACHE_KEY) + end + + def self.without_sticky_writes(&block) + current.ignore_writes(&block) + end + + def initialize + @use_primary = false + @performed_write = false + @ignore_writes = false + @fallback_to_replicas_for_ambiguous_queries = false + @use_replicas_for_read_queries = false + end + + def use_primary? + @use_primary + end + + alias_method :using_primary?, :use_primary? + + def use_primary! + @use_primary = true + end + + def use_primary(&blk) + used_primary = @use_primary + @use_primary = true + yield + ensure + @use_primary = used_primary || @performed_write + end + + def ignore_writes(&block) + @ignore_writes = true + + yield + ensure + @ignore_writes = false + end + + # Indicates that the read SQL statements from anywhere inside this + # blocks should use a replica, regardless of the current primary + # stickiness or whether a write query is already performed in the + # current session. This interface is reserved mostly for performance + # purpose. This is a good tool to push expensive queries, which can + # tolerate the replica lags, to the replicas. + # + # Write and ambiguous queries inside this block are still handled by + # the primary. + def use_replicas_for_read_queries(&blk) + previous_flag = @use_replicas_for_read_queries + @use_replicas_for_read_queries = true + yield + ensure + @use_replicas_for_read_queries = previous_flag + end + + def use_replicas_for_read_queries? + @use_replicas_for_read_queries == true + end + + # Indicate that the ambiguous SQL statements from anywhere inside this + # block should use a replica. The ambiguous statements include: + # - Transactions. + # - Custom queries (via exec_query, execute, etc.) + # - In-flight connection configuration change (SET LOCAL statement_timeout = 5000) + # + # This is a weak enforcement. This helper incorporates well with + # primary stickiness: + # - If the queries are about to write + # - The current session already performed writes + # - It prefers to use primary, aka, use_primary or use_primary! were called + def fallback_to_replicas_for_ambiguous_queries(&blk) + previous_flag = @fallback_to_replicas_for_ambiguous_queries + @fallback_to_replicas_for_ambiguous_queries = true + yield + ensure + @fallback_to_replicas_for_ambiguous_queries = previous_flag + end + + def fallback_to_replicas_for_ambiguous_queries? + @fallback_to_replicas_for_ambiguous_queries == true && !use_primary? && !performed_write? + end + + def write! + @performed_write = true + + return if @ignore_writes + + use_primary! + end + + def performed_write? + @performed_write + end + end + end + end +end diff --git a/lib/gitlab/database/load_balancing/sidekiq_client_middleware.rb b/lib/gitlab/database/load_balancing/sidekiq_client_middleware.rb new file mode 100644 index 00000000000..524d69c00c0 --- /dev/null +++ b/lib/gitlab/database/load_balancing/sidekiq_client_middleware.rb @@ -0,0 +1,46 @@ +# frozen_string_literal: true + +module Gitlab + module Database + module LoadBalancing + class SidekiqClientMiddleware + def call(worker_class, job, _queue, _redis_pool) + worker_class = worker_class.to_s.safe_constantize + + mark_data_consistency_location(worker_class, job) + + yield + end + + private + + def mark_data_consistency_location(worker_class, job) + # Mailers can't be constantized + return unless worker_class + return unless worker_class.include?(::ApplicationWorker) + return unless worker_class.get_data_consistency_feature_flag_enabled? + + return if location_already_provided?(job) + + job['worker_data_consistency'] = worker_class.get_data_consistency + + return unless worker_class.utilizes_load_balancing_capabilities? + + if Session.current.use_primary? + job['database_write_location'] = load_balancer.primary_write_location + else + job['database_replica_location'] = load_balancer.host.database_replica_location + end + end + + def location_already_provided?(job) + job['database_replica_location'] || job['database_write_location'] + end + + def load_balancer + LoadBalancing.proxy.load_balancer + end + end + end + end +end diff --git a/lib/gitlab/database/load_balancing/sidekiq_server_middleware.rb b/lib/gitlab/database/load_balancing/sidekiq_server_middleware.rb new file mode 100644 index 00000000000..9bd0adf8dbd --- /dev/null +++ b/lib/gitlab/database/load_balancing/sidekiq_server_middleware.rb @@ -0,0 +1,71 @@ +# frozen_string_literal: true + +module Gitlab + module Database + module LoadBalancing + class SidekiqServerMiddleware + JobReplicaNotUpToDate = Class.new(StandardError) + + def call(worker, job, _queue) + if requires_primary?(worker.class, job) + Session.current.use_primary! + end + + yield + ensure + clear + end + + private + + def clear + load_balancer.release_host + Session.clear_session + end + + def requires_primary?(worker_class, job) + return true unless worker_class.include?(::ApplicationWorker) + return true unless worker_class.utilizes_load_balancing_capabilities? + return true unless worker_class.get_data_consistency_feature_flag_enabled? + + location = job['database_write_location'] || job['database_replica_location'] + + return true unless location + + job_data_consistency = worker_class.get_data_consistency + job[:data_consistency] = job_data_consistency.to_s + + if replica_caught_up?(location) + job[:database_chosen] = 'replica' + false + elsif job_data_consistency == :delayed && not_yet_retried?(job) + job[:database_chosen] = 'retry' + raise JobReplicaNotUpToDate, "Sidekiq job #{worker_class} JID-#{job['jid']} couldn't use the replica."\ + " Replica was not up to date." + else + job[:database_chosen] = 'primary' + true + end + end + + def not_yet_retried?(job) + # if `retry_count` is `nil` it indicates that this job was never retried + # the `0` indicates that this is a first retry + job['retry_count'].nil? + end + + def load_balancer + LoadBalancing.proxy.load_balancer + end + + def replica_caught_up?(location) + if Feature.enabled?(:sidekiq_load_balancing_rotate_up_to_date_replica) + load_balancer.select_up_to_date_host(location) + else + load_balancer.host.caught_up?(location) + end + end + end + end + end +end diff --git a/lib/gitlab/database/load_balancing/srv_resolver.rb b/lib/gitlab/database/load_balancing/srv_resolver.rb new file mode 100644 index 00000000000..20da525f4d2 --- /dev/null +++ b/lib/gitlab/database/load_balancing/srv_resolver.rb @@ -0,0 +1,46 @@ +# frozen_string_literal: true + +module Gitlab + module Database + module LoadBalancing + # Hostnames returned in SRV records cannot sometimes be resolved by a local + # resolver, however, there's a possibility that their A/AAAA records are + # returned as part of the SRV query in the additional section, so we try + # to extract the IPs from there first, failing back to querying the + # hostnames A/AAAA records one by one, using the same resolver that + # queried the SRV record. + class SrvResolver + include Gitlab::Utils::StrongMemoize + + attr_reader :resolver, :additional + + def initialize(resolver, additional) + @resolver = resolver + @additional = additional + end + + def address_for(host) + addresses_from_additional[host] || resolve_host(host) + end + + private + + def addresses_from_additional + strong_memoize(:addresses_from_additional) do + additional.each_with_object({}) do |rr, h| + h[rr.name] = rr.address if rr.is_a?(Net::DNS::RR::A) || rr.is_a?(Net::DNS::RR::AAAA) + end + end + end + + def resolve_host(host) + record = resolver.search(host, Net::DNS::ANY).answer.find do |rr| + rr.is_a?(Net::DNS::RR::A) || rr.is_a?(Net::DNS::RR::AAAA) + end + + record&.address + end + end + end + end +end diff --git a/lib/gitlab/database/load_balancing/sticking.rb b/lib/gitlab/database/load_balancing/sticking.rb new file mode 100644 index 00000000000..efbd7099300 --- /dev/null +++ b/lib/gitlab/database/load_balancing/sticking.rb @@ -0,0 +1,147 @@ +# frozen_string_literal: true + +module Gitlab + module Database + module LoadBalancing + # Module used for handling sticking connections to a primary, if + # necessary. + # + # ## Examples + # + # Sticking a user to the primary: + # + # Sticking.stick_if_necessary(:user, current_user.id) + # + # To unstick if possible, or continue using the primary otherwise: + # + # Sticking.unstick_or_continue_sticking(:user, current_user.id) + module Sticking + # The number of seconds after which a session should stop reading from + # the primary. + EXPIRATION = 30 + + # Sticks to the primary if a write was performed. + def self.stick_if_necessary(namespace, id) + return unless LoadBalancing.enable? + + stick(namespace, id) if Session.current.performed_write? + end + + # Checks if we are caught-up with all the work + def self.all_caught_up?(namespace, id) + location = last_write_location_for(namespace, id) + + return true unless location + + load_balancer.all_caught_up?(location).tap do |caught_up| + unstick(namespace, id) if caught_up + end + end + + # Selects hosts that have caught up with the primary. This ensures + # atomic selection of the host to prevent the host list changing + # in another thread. + # + # Returns true if one host was selected. + def self.select_caught_up_replicas(namespace, id) + location = last_write_location_for(namespace, id) + + # Unlike all_caught_up?, we return false if no write location exists. + # We want to be sure we talk to a replica that has caught up for a specific + # write location. If no such location exists, err on the side of caution. + return false unless location + + load_balancer.select_caught_up_hosts(location).tap do |selected| + unstick(namespace, id) if selected + end + end + + # Sticks to the primary if necessary, otherwise unsticks an object (if + # it was previously stuck to the primary). + def self.unstick_or_continue_sticking(namespace, id) + Session.current.use_primary! unless all_caught_up?(namespace, id) + end + + # Select a replica that has caught up with the primary. If one has not been + # found, stick to the primary. + def self.select_valid_host(namespace, id) + replica_selected = select_caught_up_replicas(namespace, id) + + Session.current.use_primary! unless replica_selected + end + + # Starts sticking to the primary for the given namespace and id, using + # the latest WAL pointer from the primary. + def self.stick(namespace, id) + return unless LoadBalancing.enable? + + mark_primary_write_location(namespace, id) + Session.current.use_primary! + end + + def self.bulk_stick(namespace, ids) + return unless LoadBalancing.enable? + + with_primary_write_location do |location| + ids.each do |id| + set_write_location_for(namespace, id, location) + end + end + + Session.current.use_primary! + end + + def self.with_primary_write_location + return unless LoadBalancing.configured? + + # Load balancing could be enabled for the Web application server, + # but it's not activated for Sidekiq. We should update Redis with + # the write location just in case load balancing is being used. + location = + if LoadBalancing.enable? + load_balancer.primary_write_location + else + Gitlab::Database.get_write_location(ActiveRecord::Base.connection) + end + + return if location.blank? + + yield(location) + end + + def self.mark_primary_write_location(namespace, id) + with_primary_write_location do |location| + set_write_location_for(namespace, id, location) + end + end + + # Stops sticking to the primary. + def self.unstick(namespace, id) + Gitlab::Redis::SharedState.with do |redis| + redis.del(redis_key_for(namespace, id)) + end + end + + def self.set_write_location_for(namespace, id, location) + Gitlab::Redis::SharedState.with do |redis| + redis.set(redis_key_for(namespace, id), location, ex: EXPIRATION) + end + end + + def self.last_write_location_for(namespace, id) + Gitlab::Redis::SharedState.with do |redis| + redis.get(redis_key_for(namespace, id)) + end + end + + def self.redis_key_for(namespace, id) + "database-load-balancing/write-location/#{namespace}/#{id}" + end + + def self.load_balancer + LoadBalancing.proxy.load_balancer + end + end + end + end +end |