diff options
Diffstat (limited to 'lib/gitlab/process_supervisor.rb')
-rw-r--r-- | lib/gitlab/process_supervisor.rb | 149 |
1 files changed, 149 insertions, 0 deletions
diff --git a/lib/gitlab/process_supervisor.rb b/lib/gitlab/process_supervisor.rb new file mode 100644 index 00000000000..18fd24aa582 --- /dev/null +++ b/lib/gitlab/process_supervisor.rb @@ -0,0 +1,149 @@ +# frozen_string_literal: true + +module Gitlab + # Given a set of process IDs, the supervisor can monitor processes + # for being alive and invoke a callback if some or all should go away. + # The receiver of the callback can then act on this event, for instance + # by restarting those processes or performing clean-up work. + # + # The supervisor will also trap termination signals if provided and + # propagate those to the supervised processes. Any supervised processes + # that do not terminate within a specified grace period will be killed. + class ProcessSupervisor < Gitlab::Daemon + DEFAULT_HEALTH_CHECK_INTERVAL_SECONDS = 5 + DEFAULT_TERMINATE_INTERVAL_SECONDS = 1 + DEFAULT_TERMINATE_TIMEOUT_SECONDS = 10 + + attr_reader :alive + + def initialize( + health_check_interval_seconds: DEFAULT_HEALTH_CHECK_INTERVAL_SECONDS, + check_terminate_interval_seconds: DEFAULT_TERMINATE_INTERVAL_SECONDS, + terminate_timeout_seconds: DEFAULT_TERMINATE_TIMEOUT_SECONDS, + term_signals: %i(INT TERM), + forwarded_signals: [], + **options) + super(**options) + + @term_signals = term_signals + @forwarded_signals = forwarded_signals + @health_check_interval_seconds = health_check_interval_seconds + @check_terminate_interval_seconds = check_terminate_interval_seconds + @terminate_timeout_seconds = terminate_timeout_seconds + + @pids = [] + @alive = false + end + + # Starts a supervision loop for the given process ID(s). + # + # If any or all processes go away, the IDs of any dead processes will + # be yielded to the given block, so callers can act on them. + # + # If the block returns a non-empty list of IDs, the supervisor will + # start observing those processes instead. Otherwise it will shut down. + def supervise(pid_or_pids, &on_process_death) + @pids = Array(pid_or_pids) + @on_process_death = on_process_death + + trap_signals! + + start + end + + # Shuts down the supervisor and all supervised processes with the given signal. + def shutdown(signal = :TERM) + return unless @alive + + stop_processes(signal) + stop + end + + def supervised_pids + @pids + end + + private + + def start_working + @alive = true + end + + def stop_working + @alive = false + end + + def run_thread + while @alive + sleep(@health_check_interval_seconds) + + check_process_health + end + end + + def check_process_health + unless all_alive? + existing_pids = live_pids # Capture this value for the duration of the block. + dead_pids = @pids - existing_pids + new_pids = Array(@on_process_death.call(dead_pids)) + @pids = existing_pids + new_pids + @alive = @pids.any? + end + end + + def stop_processes(signal) + # Set this prior to shutting down so that shutdown hooks which read `alive` + # know the supervisor is about to shut down. + @alive = false + + # Shut down supervised processes. + signal_all(signal) + wait_for_termination + end + + def trap_signals! + ProcessManagement.trap_signals(@term_signals) do |signal| + stop_processes(signal) + end + + ProcessManagement.trap_signals(@forwarded_signals) do |signal| + signal_all(signal) + end + end + + def wait_for_termination + deadline = monotonic_time + @terminate_timeout_seconds + sleep(@check_terminate_interval_seconds) while continue_waiting?(deadline) + + hard_stop_stuck_pids + end + + def monotonic_time + Process.clock_gettime(Process::CLOCK_MONOTONIC, :float_second) + end + + def continue_waiting?(deadline) + any_alive? && monotonic_time < deadline + end + + def signal_all(signal) + ProcessManagement.signal_processes(@pids, signal) + end + + def hard_stop_stuck_pids + ProcessManagement.signal_processes(live_pids, "-KILL") + end + + def any_alive? + ProcessManagement.any_alive?(@pids) + end + + def all_alive? + ProcessManagement.all_alive?(@pids) + end + + def live_pids + ProcessManagement.pids_alive(@pids) + end + end +end |