diff options
author | GitLab Bot <gitlab-bot@gitlab.com> | 2021-11-18 13:16:36 +0000 |
---|---|---|
committer | GitLab Bot <gitlab-bot@gitlab.com> | 2021-11-18 13:16:36 +0000 |
commit | 311b0269b4eb9839fa63f80c8d7a58f32b8138a0 (patch) | |
tree | 07e7870bca8aed6d61fdcc810731c50d2c40af47 /sidekiq_cluster | |
parent | 27909cef6c4170ed9205afa7426b8d3de47cbb0c (diff) | |
download | gitlab-ce-311b0269b4eb9839fa63f80c8d7a58f32b8138a0.tar.gz |
Add latest changes from gitlab-org/gitlab@14-5-stable-eev14.5.0-rc42
Diffstat (limited to 'sidekiq_cluster')
-rw-r--r-- | sidekiq_cluster/cli.rb | 224 | ||||
-rw-r--r-- | sidekiq_cluster/dependencies.rb | 6 | ||||
-rw-r--r-- | sidekiq_cluster/sidekiq_cluster.rb | 184 |
3 files changed, 414 insertions, 0 deletions
diff --git a/sidekiq_cluster/cli.rb b/sidekiq_cluster/cli.rb new file mode 100644 index 00000000000..55b4521d37d --- /dev/null +++ b/sidekiq_cluster/cli.rb @@ -0,0 +1,224 @@ +# frozen_string_literal: true + +require 'optparse' +require 'logger' +require 'time' + +# In environments where code is preloaded and cached such as `spring`, +# we may run into "already initialized" warnings, hence the check. +require_relative '../lib/gitlab' unless Object.const_defined?('Gitlab') +require_relative '../lib/gitlab/utils' +require_relative '../lib/gitlab/sidekiq_config/cli_methods' +require_relative '../lib/gitlab/sidekiq_config/worker_matcher' +require_relative '../lib/gitlab/sidekiq_logging/json_formatter' +require_relative 'sidekiq_cluster' + +module Gitlab + module SidekiqCluster + class CLI + CommandError = Class.new(StandardError) + + def initialize(log_output = $stderr) + # As recommended by https://github.com/mperham/sidekiq/wiki/Advanced-Options#concurrency + @max_concurrency = 50 + @min_concurrency = 0 + @environment = ENV['RAILS_ENV'] || 'development' + @pid = nil + @interval = 5 + @alive = true + @processes = [] + @logger = Logger.new(log_output) + @logger.formatter = ::Gitlab::SidekiqLogging::JSONFormatter.new + @rails_path = Dir.pwd + @dryrun = false + @list_queues = false + end + + def run(argv = ARGV) + if argv.empty? + raise CommandError, + 'You must specify at least one queue to start a worker for' + end + + option_parser.parse!(argv) + + if @dryrun && @list_queues + raise CommandError, + 'The --dryrun and --list-queues options are mutually exclusive' + end + + worker_metadatas = SidekiqConfig::CliMethods.worker_metadatas(@rails_path) + worker_queues = SidekiqConfig::CliMethods.worker_queues(@rails_path) + + queue_groups = argv.map do |queues_or_query_string| + if queues_or_query_string =~ /[\r\n]/ + raise CommandError, + 'The queue arguments cannot contain newlines' + end + + next worker_queues if queues_or_query_string == SidekiqConfig::WorkerMatcher::WILDCARD_MATCH + + # When using the queue query syntax, we treat each queue group + # as a worker attribute query, and resolve the queues for the + # queue group using this query. + + if @queue_selector + SidekiqConfig::CliMethods.query_queues(queues_or_query_string, worker_metadatas) + else + SidekiqConfig::CliMethods.expand_queues(queues_or_query_string.split(','), worker_queues) + end + end + + if @negate_queues + queue_groups.map! { |queues| worker_queues - queues } + end + + if queue_groups.all?(&:empty?) + raise CommandError, + 'No queues found, you must select at least one queue' + end + + if @list_queues + puts queue_groups.map(&:sort) # rubocop:disable Rails/Output + + return + end + + unless @dryrun + @logger.info("Starting cluster with #{queue_groups.length} processes") + end + + @processes = SidekiqCluster.start( + queue_groups, + env: @environment, + directory: @rails_path, + max_concurrency: @max_concurrency, + min_concurrency: @min_concurrency, + dryrun: @dryrun, + timeout: soft_timeout_seconds + ) + + return if @dryrun + + write_pid + trap_signals + start_loop + end + + def write_pid + SidekiqCluster.write_pid(@pid) if @pid + end + + def soft_timeout_seconds + @soft_timeout_seconds || DEFAULT_SOFT_TIMEOUT_SECONDS + end + + # The amount of time it'll wait for killing the alive Sidekiq processes. + def hard_timeout_seconds + soft_timeout_seconds + DEFAULT_HARD_TIMEOUT_SECONDS + end + + def monotonic_time + Process.clock_gettime(Process::CLOCK_MONOTONIC, :float_second) + end + + def continue_waiting?(deadline) + SidekiqCluster.any_alive?(@processes) && monotonic_time < deadline + end + + def hard_stop_stuck_pids + SidekiqCluster.signal_processes(SidekiqCluster.pids_alive(@processes), "-KILL") + end + + def wait_for_termination + deadline = monotonic_time + hard_timeout_seconds + sleep(CHECK_TERMINATE_INTERVAL_SECONDS) while continue_waiting?(deadline) + + hard_stop_stuck_pids + end + + def trap_signals + SidekiqCluster.trap_terminate do |signal| + @alive = false + SidekiqCluster.signal_processes(@processes, signal) + wait_for_termination + end + + SidekiqCluster.trap_forward do |signal| + SidekiqCluster.signal_processes(@processes, signal) + end + end + + def start_loop + while @alive + sleep(@interval) + + unless SidekiqCluster.all_alive?(@processes) + # If a child process died we'll just terminate the whole cluster. It's up to + # runit and such to then restart the cluster. + @logger.info('A worker terminated, shutting down the cluster') + + SidekiqCluster.signal_processes(@processes, :TERM) + break + end + end + end + + def option_parser + OptionParser.new do |opt| + opt.banner = "#{File.basename(__FILE__)} [QUEUE,QUEUE] [QUEUE] ... [OPTIONS]" + + opt.separator "\nOptions:\n" + + opt.on('-h', '--help', 'Shows this help message') do + abort opt.to_s + end + + opt.on('-m', '--max-concurrency INT', 'Maximum threads to use with Sidekiq (default: 50, 0 to disable)') do |int| + @max_concurrency = int.to_i + end + + opt.on('--min-concurrency INT', 'Minimum threads to use with Sidekiq (default: 0)') do |int| + @min_concurrency = int.to_i + end + + opt.on('-e', '--environment ENV', 'The application environment') do |env| + @environment = env + end + + opt.on('-P', '--pidfile PATH', 'Path to the PID file') do |pid| + @pid = pid + end + + opt.on('-r', '--require PATH', 'Location of the Rails application') do |path| + @rails_path = path + end + + opt.on('--queue-selector', 'Run workers based on the provided selector') do |queue_selector| + @queue_selector = queue_selector + end + + opt.on('-n', '--negate', 'Run workers for all queues in sidekiq_queues.yml except the given ones') do + @negate_queues = true + end + + opt.on('-i', '--interval INT', 'The number of seconds to wait between worker checks') do |int| + @interval = int.to_i + end + + opt.on('-t', '--timeout INT', 'Graceful timeout for all running processes') do |timeout| + @soft_timeout_seconds = timeout.to_i + end + + opt.on('-d', '--dryrun', 'Print commands that would be run without this flag, and quit') do |int| + @dryrun = true + end + + opt.on('--list-queues', 'List matching queues, and quit') do |int| + @list_queues = true + end + end + end + end + end +end diff --git a/sidekiq_cluster/dependencies.rb b/sidekiq_cluster/dependencies.rb new file mode 100644 index 00000000000..91e91475f15 --- /dev/null +++ b/sidekiq_cluster/dependencies.rb @@ -0,0 +1,6 @@ +# rubocop:disable Naming/FileName +# frozen_string_literal: true + +require 'shellwords' + +# rubocop:enable Naming/FileName diff --git a/sidekiq_cluster/sidekiq_cluster.rb b/sidekiq_cluster/sidekiq_cluster.rb new file mode 100644 index 00000000000..49478ba740d --- /dev/null +++ b/sidekiq_cluster/sidekiq_cluster.rb @@ -0,0 +1,184 @@ +# frozen_string_literal: true + +require_relative 'dependencies' + +module Gitlab + module SidekiqCluster + CHECK_TERMINATE_INTERVAL_SECONDS = 1 + + # How long to wait when asking for a clean termination. + # It maps the Sidekiq default timeout: + # https://github.com/mperham/sidekiq/wiki/Signals#term + # + # This value is passed to Sidekiq's `-t` if none + # is given through arguments. + DEFAULT_SOFT_TIMEOUT_SECONDS = 25 + + # After surpassing the soft timeout. + DEFAULT_HARD_TIMEOUT_SECONDS = 5 + + # The signals that should terminate both the master and workers. + TERMINATE_SIGNALS = %i(INT TERM).freeze + + # The signals that should simply be forwarded to the workers. + FORWARD_SIGNALS = %i(TTIN USR1 USR2 HUP).freeze + + # Traps the given signals and yields the block whenever these signals are + # received. + # + # The block is passed the name of the signal. + # + # Example: + # + # trap_signals(%i(HUP TERM)) do |signal| + # ... + # end + def self.trap_signals(signals) + signals.each do |signal| + trap(signal) do + yield signal + end + end + end + + def self.trap_terminate(&block) + trap_signals(TERMINATE_SIGNALS, &block) + end + + def self.trap_forward(&block) + trap_signals(FORWARD_SIGNALS, &block) + end + + def self.signal(pid, signal) + Process.kill(signal, pid) + true + rescue Errno::ESRCH + false + end + + def self.signal_processes(pids, signal) + pids.each { |pid| signal(pid, signal) } + end + + # Starts Sidekiq workers for the pairs of processes. + # + # Example: + # + # start([ ['foo'], ['bar', 'baz'] ], :production) + # + # This would start two Sidekiq processes: one processing "foo", and one + # processing "bar" and "baz". Each one is placed in its own process group. + # + # queues - An Array containing Arrays. Each sub Array should specify the + # queues to use for a single process. + # + # directory - The directory of the Rails application. + # + # Returns an Array containing the PIDs of the started processes. + def self.start(queues, env: :development, directory: Dir.pwd, max_concurrency: 50, min_concurrency: 0, timeout: DEFAULT_SOFT_TIMEOUT_SECONDS, dryrun: false) + queues.map.with_index do |pair, index| + start_sidekiq(pair, env: env, + directory: directory, + max_concurrency: max_concurrency, + min_concurrency: min_concurrency, + worker_id: index, + timeout: timeout, + dryrun: dryrun) + end + end + + # Starts a Sidekiq process that processes _only_ the given queues. + # + # Returns the PID of the started process. + def self.start_sidekiq(queues, env:, directory:, max_concurrency:, min_concurrency:, worker_id:, timeout:, dryrun:) + counts = count_by_queue(queues) + + cmd = %w[bundle exec sidekiq] + cmd << "-c#{self.concurrency(queues, min_concurrency, max_concurrency)}" + cmd << "-e#{env}" + cmd << "-t#{timeout}" + cmd << "-gqueues:#{proc_details(counts)}" + cmd << "-r#{directory}" + + counts.each do |queue, count| + cmd << "-q#{queue},#{count}" + end + + if dryrun + puts Shellwords.join(cmd) # rubocop:disable Rails/Output + return + end + + pid = Process.spawn( + { 'ENABLE_SIDEKIQ_CLUSTER' => '1', + 'SIDEKIQ_WORKER_ID' => worker_id.to_s }, + *cmd, + pgroup: true, + err: $stderr, + out: $stdout + ) + + wait_async(pid) + + pid + end + + def self.count_by_queue(queues) + queues.tally + end + + def self.proc_details(counts) + counts.map do |queue, count| + if count == 1 + queue + else + "#{queue} (#{count})" + end + end.join(',') + end + + def self.concurrency(queues, min_concurrency, max_concurrency) + concurrency_from_queues = queues.length + 1 + max = max_concurrency > 0 ? max_concurrency : concurrency_from_queues + min = [min_concurrency, max].min + + concurrency_from_queues.clamp(min, max) + end + + # Waits for the given process to complete using a separate thread. + def self.wait_async(pid) + Thread.new do + Process.wait(pid) rescue Errno::ECHILD + end + end + + # Returns true if all the processes are alive. + def self.all_alive?(pids) + pids.each do |pid| + return false unless process_alive?(pid) + end + + true + end + + def self.any_alive?(pids) + pids_alive(pids).any? + end + + def self.pids_alive(pids) + pids.select { |pid| process_alive?(pid) } + end + + def self.process_alive?(pid) + # Signal 0 tests whether the process exists and we have access to send signals + # but is otherwise a noop (doesn't actually send a signal to the process) + signal(pid, 0) + end + + def self.write_pid(path) + File.open(path, 'w') do |handle| + handle.write(Process.pid.to_s) + end + end + end +end |