summaryrefslogtreecommitdiff
path: root/sidekiq_cluster
diff options
context:
space:
mode:
Diffstat (limited to 'sidekiq_cluster')
-rw-r--r--sidekiq_cluster/cli.rb109
-rw-r--r--sidekiq_cluster/sidekiq_cluster.rb10
2 files changed, 41 insertions, 78 deletions
diff --git a/sidekiq_cluster/cli.rb b/sidekiq_cluster/cli.rb
index 2feb77601b8..e04f5ab1d42 100644
--- a/sidekiq_cluster/cli.rb
+++ b/sidekiq_cluster/cli.rb
@@ -13,7 +13,7 @@ require_relative '../lib/gitlab/utils'
require_relative '../lib/gitlab/sidekiq_config/cli_methods'
require_relative '../lib/gitlab/sidekiq_config/worker_matcher'
require_relative '../lib/gitlab/sidekiq_logging/json_formatter'
-require_relative '../lib/gitlab/process_management'
+require_relative '../metrics_server/dependencies'
require_relative '../metrics_server/metrics_server'
require_relative 'sidekiq_cluster'
@@ -38,8 +38,7 @@ module Gitlab
@metrics_dir = ENV["prometheus_multiproc_dir"] || File.absolute_path("tmp/prometheus_multiproc_dir/sidekiq")
@pid = nil
@interval = 5
- @alive = true
- @processes = []
+ @soft_timeout_seconds = DEFAULT_SOFT_TIMEOUT_SECONDS
@logger = Logger.new(log_output)
@logger.formatter = ::Gitlab::SidekiqLogging::JSONFormatter.new
@rails_path = Dir.pwd
@@ -103,95 +102,64 @@ module Gitlab
@logger.info("Starting cluster with #{queue_groups.length} processes")
end
- start_metrics_server(wipe_metrics_dir: true)
+ start_and_supervise_workers(queue_groups)
+ end
- @processes = SidekiqCluster.start(
+ def start_and_supervise_workers(queue_groups)
+ worker_pids = SidekiqCluster.start(
queue_groups,
env: @environment,
directory: @rails_path,
max_concurrency: @max_concurrency,
min_concurrency: @min_concurrency,
dryrun: @dryrun,
- timeout: soft_timeout_seconds
+ timeout: @soft_timeout_seconds
)
return if @dryrun
- write_pid
- trap_signals
- start_loop
- end
-
- def write_pid
ProcessManagement.write_pid(@pid) if @pid
- end
-
- def soft_timeout_seconds
- @soft_timeout_seconds || DEFAULT_SOFT_TIMEOUT_SECONDS
- end
-
- # The amount of time it'll wait for killing the alive Sidekiq processes.
- def hard_timeout_seconds
- soft_timeout_seconds + DEFAULT_HARD_TIMEOUT_SECONDS
- end
-
- def monotonic_time
- Process.clock_gettime(Process::CLOCK_MONOTONIC, :float_second)
- end
-
- def continue_waiting?(deadline)
- ProcessManagement.any_alive?(@processes) && monotonic_time < deadline
- end
-
- def hard_stop_stuck_pids
- ProcessManagement.signal_processes(ProcessManagement.pids_alive(@processes), "-KILL")
- end
-
- def wait_for_termination
- deadline = monotonic_time + hard_timeout_seconds
- sleep(CHECK_TERMINATE_INTERVAL_SECONDS) while continue_waiting?(deadline)
- hard_stop_stuck_pids
- end
-
- def trap_signals
- ProcessManagement.trap_signals(TERMINATE_SIGNALS) do |signal|
- @alive = false
- ProcessManagement.signal_processes(@processes, signal)
- wait_for_termination
- end
-
- ProcessManagement.trap_signals(FORWARD_SIGNALS) do |signal|
- ProcessManagement.signal_processes(@processes, signal)
- end
- end
+ supervisor = SidekiqProcessSupervisor.instance(
+ health_check_interval_seconds: @interval,
+ terminate_timeout_seconds: @soft_timeout_seconds + TIMEOUT_GRACE_PERIOD_SECONDS,
+ term_signals: TERMINATE_SIGNALS,
+ forwarded_signals: FORWARD_SIGNALS,
+ synchronous: true
+ )
- def start_loop
- while @alive
- sleep(@interval)
+ metrics_server_pid = start_metrics_server
- if metrics_server_enabled? && ProcessManagement.process_died?(@metrics_server_pid)
- @logger.warn('Metrics server went away')
- start_metrics_server(wipe_metrics_dir: false)
- end
+ all_pids = worker_pids + Array(metrics_server_pid)
- unless ProcessManagement.all_alive?(@processes)
- # If a child process died we'll just terminate the whole cluster. It's up to
- # runit and such to then restart the cluster.
+ supervisor.supervise(all_pids) do |dead_pids|
+ # If we're not in the process of shutting down the cluster,
+ # and the metrics server died, restart it.
+ if supervisor.alive && dead_pids.include?(metrics_server_pid)
+ @logger.info('Sidekiq metrics server terminated, restarting...')
+ metrics_server_pid = restart_metrics_server(wipe_metrics_dir: false)
+ all_pids = worker_pids + Array(metrics_server_pid)
+ else
+ # If a worker process died we'll just terminate the whole cluster.
+ # We let an external system (runit, kubernetes) handle the restart.
@logger.info('A worker terminated, shutting down the cluster')
- stop_metrics_server
- ProcessManagement.signal_processes(@processes, :TERM)
- break
+ ProcessManagement.signal_processes(all_pids - dead_pids, :TERM)
+ # Signal supervisor not to respawn workers and shut down.
+ []
end
end
end
- def start_metrics_server(wipe_metrics_dir: false)
+ def start_metrics_server
return unless metrics_server_enabled?
+ restart_metrics_server(wipe_metrics_dir: true)
+ end
+
+ def restart_metrics_server(wipe_metrics_dir: false)
@logger.info("Starting metrics server on port #{sidekiq_exporter_port}")
- @metrics_server_pid = MetricsServer.fork(
+ MetricsServer.fork(
'sidekiq',
metrics_dir: @metrics_dir,
wipe_metrics_dir: wipe_metrics_dir,
@@ -225,13 +193,6 @@ module Gitlab
!@dryrun && sidekiq_exporter_enabled? && exporter_has_a_unique_port?
end
- def stop_metrics_server
- return unless @metrics_server_pid
-
- @logger.info("Stopping metrics server (PID #{@metrics_server_pid})")
- ProcessManagement.signal(@metrics_server_pid, :TERM)
- end
-
def option_parser
OptionParser.new do |opt|
opt.banner = "#{File.basename(__FILE__)} [QUEUE,QUEUE] [QUEUE] ... [OPTIONS]"
diff --git a/sidekiq_cluster/sidekiq_cluster.rb b/sidekiq_cluster/sidekiq_cluster.rb
index c5139ab8874..c68cbe7c163 100644
--- a/sidekiq_cluster/sidekiq_cluster.rb
+++ b/sidekiq_cluster/sidekiq_cluster.rb
@@ -4,8 +4,6 @@ require_relative '../lib/gitlab/process_management'
module Gitlab
module SidekiqCluster
- CHECK_TERMINATE_INTERVAL_SECONDS = 1
-
# How long to wait when asking for a clean termination.
# It maps the Sidekiq default timeout:
# https://github.com/mperham/sidekiq/wiki/Signals#term
@@ -14,8 +12,12 @@ module Gitlab
# is given through arguments.
DEFAULT_SOFT_TIMEOUT_SECONDS = 25
- # After surpassing the soft timeout.
- DEFAULT_HARD_TIMEOUT_SECONDS = 5
+ # Additional time granted after surpassing the soft timeout
+ # before we kill the process.
+ TIMEOUT_GRACE_PERIOD_SECONDS = 5
+
+ # The singleton instance used to supervise cluster processes.
+ SidekiqProcessSupervisor = Class.new(Gitlab::ProcessSupervisor)
# Starts Sidekiq workers for the pairs of processes.
#