summaryrefslogtreecommitdiff
path: root/lib/gitlab/sidekiq_cluster.rb
diff options
context:
space:
mode:
authorGitLab Bot <gitlab-bot@gitlab.com>2020-03-12 00:09:34 +0000
committerGitLab Bot <gitlab-bot@gitlab.com>2020-03-12 00:09:34 +0000
commit5781a4966047232d4725f9ee4769c4bd5aed9b26 (patch)
tree0ef2b81a40931ec51f8fdd5284ed9e47cf42a923 /lib/gitlab/sidekiq_cluster.rb
parent4d48b3cfcd74bcca0f0f305746f74cf7224dd78b (diff)
downloadgitlab-ce-5781a4966047232d4725f9ee4769c4bd5aed9b26.tar.gz
Add latest changes from gitlab-org/gitlab@master
Diffstat (limited to 'lib/gitlab/sidekiq_cluster.rb')
-rw-r--r--lib/gitlab/sidekiq_cluster.rb162
1 files changed, 162 insertions, 0 deletions
diff --git a/lib/gitlab/sidekiq_cluster.rb b/lib/gitlab/sidekiq_cluster.rb
new file mode 100644
index 00000000000..c19bef1389a
--- /dev/null
+++ b/lib/gitlab/sidekiq_cluster.rb
@@ -0,0 +1,162 @@
+# frozen_string_literal: true
+
+module Gitlab
+ module SidekiqCluster
+ # The signals that should terminate both the master and workers.
+ TERMINATE_SIGNALS = %i(INT TERM).freeze
+
+ # The signals that should simply be forwarded to the workers.
+ FORWARD_SIGNALS = %i(TTIN USR1 USR2 HUP).freeze
+
+ # Traps the given signals and yields the block whenever these signals are
+ # received.
+ #
+ # The block is passed the name of the signal.
+ #
+ # Example:
+ #
+ # trap_signals(%i(HUP TERM)) do |signal|
+ # ...
+ # end
+ def self.trap_signals(signals)
+ signals.each do |signal|
+ trap(signal) do
+ yield signal
+ end
+ end
+ end
+
+ def self.trap_terminate(&block)
+ trap_signals(TERMINATE_SIGNALS, &block)
+ end
+
+ def self.trap_forward(&block)
+ trap_signals(FORWARD_SIGNALS, &block)
+ end
+
+ def self.signal(pid, signal)
+ Process.kill(signal, pid)
+ true
+ rescue Errno::ESRCH
+ false
+ end
+
+ def self.signal_processes(pids, signal)
+ pids.each { |pid| signal(pid, signal) }
+ end
+
+ # Starts Sidekiq workers for the pairs of processes.
+ #
+ # Example:
+ #
+ # start([ ['foo'], ['bar', 'baz'] ], :production)
+ #
+ # This would start two Sidekiq processes: one processing "foo", and one
+ # processing "bar" and "baz". Each one is placed in its own process group.
+ #
+ # queues - An Array containing Arrays. Each sub Array should specify the
+ # queues to use for a single process.
+ #
+ # directory - The directory of the Rails application.
+ #
+ # Returns an Array containing the PIDs of the started processes.
+ def self.start(queues, env: :development, directory: Dir.pwd, max_concurrency: 50, min_concurrency: 0, dryrun: false)
+ queues.map.with_index do |pair, index|
+ start_sidekiq(pair, env: env, directory: directory, max_concurrency: max_concurrency, min_concurrency: min_concurrency, worker_id: index, dryrun: dryrun)
+ end
+ end
+
+ # Starts a Sidekiq process that processes _only_ the given queues.
+ #
+ # Returns the PID of the started process.
+ def self.start_sidekiq(queues, env:, directory:, max_concurrency:, min_concurrency:, worker_id:, dryrun:)
+ counts = count_by_queue(queues)
+
+ cmd = %w[bundle exec sidekiq]
+ cmd << "-c #{self.concurrency(queues, min_concurrency, max_concurrency)}"
+ cmd << "-e#{env}"
+ cmd << "-gqueues: #{proc_details(counts)}"
+ cmd << "-r#{directory}"
+
+ counts.each do |queue, count|
+ cmd << "-q#{queue},#{count}"
+ end
+
+ if dryrun
+ puts "Sidekiq command: #{cmd}" # rubocop:disable Rails/Output
+ return
+ end
+
+ pid = Process.spawn(
+ { 'ENABLE_SIDEKIQ_CLUSTER' => '1',
+ 'SIDEKIQ_WORKER_ID' => worker_id.to_s },
+ *cmd,
+ pgroup: true,
+ err: $stderr,
+ out: $stdout
+ )
+
+ wait_async(pid)
+
+ pid
+ end
+
+ def self.count_by_queue(queues)
+ queues.each_with_object(Hash.new(0)) { |element, hash| hash[element] += 1 }
+ end
+
+ def self.proc_details(counts)
+ counts.map do |queue, count|
+ if count == 1
+ queue
+ else
+ "#{queue} (#{count})"
+ end
+ end.join(', ')
+ end
+
+ def self.concurrency(queues, min_concurrency, max_concurrency)
+ concurrency_from_queues = queues.length + 1
+ max = max_concurrency.positive? ? max_concurrency : concurrency_from_queues
+ min = [min_concurrency, max].min
+
+ concurrency_from_queues.clamp(min, max)
+ end
+
+ # Waits for the given process to complete using a separate thread.
+ def self.wait_async(pid)
+ Thread.new do
+ Process.wait(pid) rescue Errno::ECHILD
+ end
+ end
+
+ # Returns true if all the processes are alive.
+ def self.all_alive?(pids)
+ pids.each do |pid|
+ return false unless process_alive?(pid)
+ end
+
+ true
+ end
+
+ def self.any_alive?(pids)
+ pids_alive(pids).any?
+ end
+
+ def self.pids_alive(pids)
+ pids.select { |pid| process_alive?(pid) }
+ end
+
+ def self.process_alive?(pid)
+ # Signal 0 tests whether the process exists and we have access to send signals
+ # but is otherwise a noop (doesn't actually send a signal to the process)
+ signal(pid, 0)
+ end
+
+ def self.write_pid(path)
+ File.open(path, 'w') do |handle|
+ handle.write(Process.pid.to_s)
+ end
+ end
+ end
+end