summaryrefslogtreecommitdiff
path: root/sidekiq_cluster
diff options
context:
space:
mode:
authorGitLab Bot <gitlab-bot@gitlab.com>2021-11-18 13:16:36 +0000
committerGitLab Bot <gitlab-bot@gitlab.com>2021-11-18 13:16:36 +0000
commit311b0269b4eb9839fa63f80c8d7a58f32b8138a0 (patch)
tree07e7870bca8aed6d61fdcc810731c50d2c40af47 /sidekiq_cluster
parent27909cef6c4170ed9205afa7426b8d3de47cbb0c (diff)
downloadgitlab-ce-311b0269b4eb9839fa63f80c8d7a58f32b8138a0.tar.gz
Add latest changes from gitlab-org/gitlab@14-5-stable-eev14.5.0-rc42
Diffstat (limited to 'sidekiq_cluster')
-rw-r--r--sidekiq_cluster/cli.rb224
-rw-r--r--sidekiq_cluster/dependencies.rb6
-rw-r--r--sidekiq_cluster/sidekiq_cluster.rb184
3 files changed, 414 insertions, 0 deletions
diff --git a/sidekiq_cluster/cli.rb b/sidekiq_cluster/cli.rb
new file mode 100644
index 00000000000..55b4521d37d
--- /dev/null
+++ b/sidekiq_cluster/cli.rb
@@ -0,0 +1,224 @@
+# frozen_string_literal: true
+
+require 'optparse'
+require 'logger'
+require 'time'
+
+# In environments where code is preloaded and cached such as `spring`,
+# we may run into "already initialized" warnings, hence the check.
+require_relative '../lib/gitlab' unless Object.const_defined?('Gitlab')
+require_relative '../lib/gitlab/utils'
+require_relative '../lib/gitlab/sidekiq_config/cli_methods'
+require_relative '../lib/gitlab/sidekiq_config/worker_matcher'
+require_relative '../lib/gitlab/sidekiq_logging/json_formatter'
+require_relative 'sidekiq_cluster'
+
+module Gitlab
+ module SidekiqCluster
+ class CLI
+ CommandError = Class.new(StandardError)
+
+ def initialize(log_output = $stderr)
+ # As recommended by https://github.com/mperham/sidekiq/wiki/Advanced-Options#concurrency
+ @max_concurrency = 50
+ @min_concurrency = 0
+ @environment = ENV['RAILS_ENV'] || 'development'
+ @pid = nil
+ @interval = 5
+ @alive = true
+ @processes = []
+ @logger = Logger.new(log_output)
+ @logger.formatter = ::Gitlab::SidekiqLogging::JSONFormatter.new
+ @rails_path = Dir.pwd
+ @dryrun = false
+ @list_queues = false
+ end
+
+ def run(argv = ARGV)
+ if argv.empty?
+ raise CommandError,
+ 'You must specify at least one queue to start a worker for'
+ end
+
+ option_parser.parse!(argv)
+
+ if @dryrun && @list_queues
+ raise CommandError,
+ 'The --dryrun and --list-queues options are mutually exclusive'
+ end
+
+ worker_metadatas = SidekiqConfig::CliMethods.worker_metadatas(@rails_path)
+ worker_queues = SidekiqConfig::CliMethods.worker_queues(@rails_path)
+
+ queue_groups = argv.map do |queues_or_query_string|
+ if queues_or_query_string =~ /[\r\n]/
+ raise CommandError,
+ 'The queue arguments cannot contain newlines'
+ end
+
+ next worker_queues if queues_or_query_string == SidekiqConfig::WorkerMatcher::WILDCARD_MATCH
+
+ # When using the queue query syntax, we treat each queue group
+ # as a worker attribute query, and resolve the queues for the
+ # queue group using this query.
+
+ if @queue_selector
+ SidekiqConfig::CliMethods.query_queues(queues_or_query_string, worker_metadatas)
+ else
+ SidekiqConfig::CliMethods.expand_queues(queues_or_query_string.split(','), worker_queues)
+ end
+ end
+
+ if @negate_queues
+ queue_groups.map! { |queues| worker_queues - queues }
+ end
+
+ if queue_groups.all?(&:empty?)
+ raise CommandError,
+ 'No queues found, you must select at least one queue'
+ end
+
+ if @list_queues
+ puts queue_groups.map(&:sort) # rubocop:disable Rails/Output
+
+ return
+ end
+
+ unless @dryrun
+ @logger.info("Starting cluster with #{queue_groups.length} processes")
+ end
+
+ @processes = SidekiqCluster.start(
+ queue_groups,
+ env: @environment,
+ directory: @rails_path,
+ max_concurrency: @max_concurrency,
+ min_concurrency: @min_concurrency,
+ dryrun: @dryrun,
+ timeout: soft_timeout_seconds
+ )
+
+ return if @dryrun
+
+ write_pid
+ trap_signals
+ start_loop
+ end
+
+ def write_pid
+ SidekiqCluster.write_pid(@pid) if @pid
+ end
+
+ def soft_timeout_seconds
+ @soft_timeout_seconds || DEFAULT_SOFT_TIMEOUT_SECONDS
+ end
+
+ # The amount of time it'll wait for killing the alive Sidekiq processes.
+ def hard_timeout_seconds
+ soft_timeout_seconds + DEFAULT_HARD_TIMEOUT_SECONDS
+ end
+
+ def monotonic_time
+ Process.clock_gettime(Process::CLOCK_MONOTONIC, :float_second)
+ end
+
+ def continue_waiting?(deadline)
+ SidekiqCluster.any_alive?(@processes) && monotonic_time < deadline
+ end
+
+ def hard_stop_stuck_pids
+ SidekiqCluster.signal_processes(SidekiqCluster.pids_alive(@processes), "-KILL")
+ end
+
+ def wait_for_termination
+ deadline = monotonic_time + hard_timeout_seconds
+ sleep(CHECK_TERMINATE_INTERVAL_SECONDS) while continue_waiting?(deadline)
+
+ hard_stop_stuck_pids
+ end
+
+ def trap_signals
+ SidekiqCluster.trap_terminate do |signal|
+ @alive = false
+ SidekiqCluster.signal_processes(@processes, signal)
+ wait_for_termination
+ end
+
+ SidekiqCluster.trap_forward do |signal|
+ SidekiqCluster.signal_processes(@processes, signal)
+ end
+ end
+
+ def start_loop
+ while @alive
+ sleep(@interval)
+
+ unless SidekiqCluster.all_alive?(@processes)
+ # If a child process died we'll just terminate the whole cluster. It's up to
+ # runit and such to then restart the cluster.
+ @logger.info('A worker terminated, shutting down the cluster')
+
+ SidekiqCluster.signal_processes(@processes, :TERM)
+ break
+ end
+ end
+ end
+
+ def option_parser
+ OptionParser.new do |opt|
+ opt.banner = "#{File.basename(__FILE__)} [QUEUE,QUEUE] [QUEUE] ... [OPTIONS]"
+
+ opt.separator "\nOptions:\n"
+
+ opt.on('-h', '--help', 'Shows this help message') do
+ abort opt.to_s
+ end
+
+ opt.on('-m', '--max-concurrency INT', 'Maximum threads to use with Sidekiq (default: 50, 0 to disable)') do |int|
+ @max_concurrency = int.to_i
+ end
+
+ opt.on('--min-concurrency INT', 'Minimum threads to use with Sidekiq (default: 0)') do |int|
+ @min_concurrency = int.to_i
+ end
+
+ opt.on('-e', '--environment ENV', 'The application environment') do |env|
+ @environment = env
+ end
+
+ opt.on('-P', '--pidfile PATH', 'Path to the PID file') do |pid|
+ @pid = pid
+ end
+
+ opt.on('-r', '--require PATH', 'Location of the Rails application') do |path|
+ @rails_path = path
+ end
+
+ opt.on('--queue-selector', 'Run workers based on the provided selector') do |queue_selector|
+ @queue_selector = queue_selector
+ end
+
+ opt.on('-n', '--negate', 'Run workers for all queues in sidekiq_queues.yml except the given ones') do
+ @negate_queues = true
+ end
+
+ opt.on('-i', '--interval INT', 'The number of seconds to wait between worker checks') do |int|
+ @interval = int.to_i
+ end
+
+ opt.on('-t', '--timeout INT', 'Graceful timeout for all running processes') do |timeout|
+ @soft_timeout_seconds = timeout.to_i
+ end
+
+ opt.on('-d', '--dryrun', 'Print commands that would be run without this flag, and quit') do |int|
+ @dryrun = true
+ end
+
+ opt.on('--list-queues', 'List matching queues, and quit') do |int|
+ @list_queues = true
+ end
+ end
+ end
+ end
+ end
+end
diff --git a/sidekiq_cluster/dependencies.rb b/sidekiq_cluster/dependencies.rb
new file mode 100644
index 00000000000..91e91475f15
--- /dev/null
+++ b/sidekiq_cluster/dependencies.rb
@@ -0,0 +1,6 @@
+# rubocop:disable Naming/FileName
+# frozen_string_literal: true
+
+require 'shellwords'
+
+# rubocop:enable Naming/FileName
diff --git a/sidekiq_cluster/sidekiq_cluster.rb b/sidekiq_cluster/sidekiq_cluster.rb
new file mode 100644
index 00000000000..49478ba740d
--- /dev/null
+++ b/sidekiq_cluster/sidekiq_cluster.rb
@@ -0,0 +1,184 @@
+# frozen_string_literal: true
+
+require_relative 'dependencies'
+
+module Gitlab
+ module SidekiqCluster
+ CHECK_TERMINATE_INTERVAL_SECONDS = 1
+
+ # How long to wait when asking for a clean termination.
+ # It maps the Sidekiq default timeout:
+ # https://github.com/mperham/sidekiq/wiki/Signals#term
+ #
+ # This value is passed to Sidekiq's `-t` if none
+ # is given through arguments.
+ DEFAULT_SOFT_TIMEOUT_SECONDS = 25
+
+ # After surpassing the soft timeout.
+ DEFAULT_HARD_TIMEOUT_SECONDS = 5
+
+ # The signals that should terminate both the master and workers.
+ TERMINATE_SIGNALS = %i(INT TERM).freeze
+
+ # The signals that should simply be forwarded to the workers.
+ FORWARD_SIGNALS = %i(TTIN USR1 USR2 HUP).freeze
+
+ # Traps the given signals and yields the block whenever these signals are
+ # received.
+ #
+ # The block is passed the name of the signal.
+ #
+ # Example:
+ #
+ # trap_signals(%i(HUP TERM)) do |signal|
+ # ...
+ # end
+ def self.trap_signals(signals)
+ signals.each do |signal|
+ trap(signal) do
+ yield signal
+ end
+ end
+ end
+
+ def self.trap_terminate(&block)
+ trap_signals(TERMINATE_SIGNALS, &block)
+ end
+
+ def self.trap_forward(&block)
+ trap_signals(FORWARD_SIGNALS, &block)
+ end
+
+ def self.signal(pid, signal)
+ Process.kill(signal, pid)
+ true
+ rescue Errno::ESRCH
+ false
+ end
+
+ def self.signal_processes(pids, signal)
+ pids.each { |pid| signal(pid, signal) }
+ end
+
+ # Starts Sidekiq workers for the pairs of processes.
+ #
+ # Example:
+ #
+ # start([ ['foo'], ['bar', 'baz'] ], :production)
+ #
+ # This would start two Sidekiq processes: one processing "foo", and one
+ # processing "bar" and "baz". Each one is placed in its own process group.
+ #
+ # queues - An Array containing Arrays. Each sub Array should specify the
+ # queues to use for a single process.
+ #
+ # directory - The directory of the Rails application.
+ #
+ # Returns an Array containing the PIDs of the started processes.
+ def self.start(queues, env: :development, directory: Dir.pwd, max_concurrency: 50, min_concurrency: 0, timeout: DEFAULT_SOFT_TIMEOUT_SECONDS, dryrun: false)
+ queues.map.with_index do |pair, index|
+ start_sidekiq(pair, env: env,
+ directory: directory,
+ max_concurrency: max_concurrency,
+ min_concurrency: min_concurrency,
+ worker_id: index,
+ timeout: timeout,
+ dryrun: dryrun)
+ end
+ end
+
+ # Starts a Sidekiq process that processes _only_ the given queues.
+ #
+ # Returns the PID of the started process.
+ def self.start_sidekiq(queues, env:, directory:, max_concurrency:, min_concurrency:, worker_id:, timeout:, dryrun:)
+ counts = count_by_queue(queues)
+
+ cmd = %w[bundle exec sidekiq]
+ cmd << "-c#{self.concurrency(queues, min_concurrency, max_concurrency)}"
+ cmd << "-e#{env}"
+ cmd << "-t#{timeout}"
+ cmd << "-gqueues:#{proc_details(counts)}"
+ cmd << "-r#{directory}"
+
+ counts.each do |queue, count|
+ cmd << "-q#{queue},#{count}"
+ end
+
+ if dryrun
+ puts Shellwords.join(cmd) # rubocop:disable Rails/Output
+ return
+ end
+
+ pid = Process.spawn(
+ { 'ENABLE_SIDEKIQ_CLUSTER' => '1',
+ 'SIDEKIQ_WORKER_ID' => worker_id.to_s },
+ *cmd,
+ pgroup: true,
+ err: $stderr,
+ out: $stdout
+ )
+
+ wait_async(pid)
+
+ pid
+ end
+
+ def self.count_by_queue(queues)
+ queues.tally
+ end
+
+ def self.proc_details(counts)
+ counts.map do |queue, count|
+ if count == 1
+ queue
+ else
+ "#{queue} (#{count})"
+ end
+ end.join(',')
+ end
+
+ def self.concurrency(queues, min_concurrency, max_concurrency)
+ concurrency_from_queues = queues.length + 1
+ max = max_concurrency > 0 ? max_concurrency : concurrency_from_queues
+ min = [min_concurrency, max].min
+
+ concurrency_from_queues.clamp(min, max)
+ end
+
+ # Waits for the given process to complete using a separate thread.
+ def self.wait_async(pid)
+ Thread.new do
+ Process.wait(pid) rescue Errno::ECHILD
+ end
+ end
+
+ # Returns true if all the processes are alive.
+ def self.all_alive?(pids)
+ pids.each do |pid|
+ return false unless process_alive?(pid)
+ end
+
+ true
+ end
+
+ def self.any_alive?(pids)
+ pids_alive(pids).any?
+ end
+
+ def self.pids_alive(pids)
+ pids.select { |pid| process_alive?(pid) }
+ end
+
+ def self.process_alive?(pid)
+ # Signal 0 tests whether the process exists and we have access to send signals
+ # but is otherwise a noop (doesn't actually send a signal to the process)
+ signal(pid, 0)
+ end
+
+ def self.write_pid(path)
+ File.open(path, 'w') do |handle|
+ handle.write(Process.pid.to_s)
+ end
+ end
+ end
+end