1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
|
# frozen_string_literal: true
module Gitlab
# Given a set of process IDs, the supervisor can monitor processes
# for being alive and invoke a callback if some or all should go away.
# The receiver of the callback can then act on this event, for instance
# by restarting those processes or performing clean-up work.
#
# The supervisor will also trap termination signals if provided and
# propagate those to the supervised processes. Any supervised processes
# that do not terminate within a specified grace period will be killed.
class ProcessSupervisor < Gitlab::Daemon
DEFAULT_HEALTH_CHECK_INTERVAL_SECONDS = 5
DEFAULT_TERMINATE_INTERVAL_SECONDS = 1
DEFAULT_TERMINATE_TIMEOUT_SECONDS = 10
attr_reader :alive
def initialize(
health_check_interval_seconds: DEFAULT_HEALTH_CHECK_INTERVAL_SECONDS,
check_terminate_interval_seconds: DEFAULT_TERMINATE_INTERVAL_SECONDS,
terminate_timeout_seconds: DEFAULT_TERMINATE_TIMEOUT_SECONDS,
term_signals: %i(INT TERM),
forwarded_signals: [],
**options)
super(**options)
@term_signals = term_signals
@forwarded_signals = forwarded_signals
@health_check_interval_seconds = health_check_interval_seconds
@check_terminate_interval_seconds = check_terminate_interval_seconds
@terminate_timeout_seconds = terminate_timeout_seconds
@pids = []
@alive = false
end
# Starts a supervision loop for the given process ID(s).
#
# If any or all processes go away, the IDs of any dead processes will
# be yielded to the given block, so callers can act on them.
#
# If the block returns a non-empty list of IDs, the supervisor will
# start observing those processes instead. Otherwise it will shut down.
def supervise(pid_or_pids, &on_process_death)
@pids = Array(pid_or_pids)
@on_process_death = on_process_death
trap_signals!
start
end
# Shuts down the supervisor and all supervised processes with the given signal.
def shutdown(signal = :TERM)
return unless @alive
stop_processes(signal)
stop
end
def supervised_pids
@pids
end
private
def start_working
@alive = true
end
def stop_working
@alive = false
end
def run_thread
while @alive
sleep(@health_check_interval_seconds)
check_process_health
end
end
def check_process_health
unless all_alive?
existing_pids = live_pids # Capture this value for the duration of the block.
dead_pids = @pids - existing_pids
new_pids = Array(@on_process_death.call(dead_pids))
@pids = existing_pids + new_pids
@alive = @pids.any?
end
end
def stop_processes(signal)
# Set this prior to shutting down so that shutdown hooks which read `alive`
# know the supervisor is about to shut down.
@alive = false
# Shut down supervised processes.
signal_all(signal)
wait_for_termination
end
def trap_signals!
ProcessManagement.trap_signals(@term_signals) do |signal|
stop_processes(signal)
end
ProcessManagement.trap_signals(@forwarded_signals) do |signal|
signal_all(signal)
end
end
def wait_for_termination
deadline = monotonic_time + @terminate_timeout_seconds
sleep(@check_terminate_interval_seconds) while continue_waiting?(deadline)
hard_stop_stuck_pids
end
def monotonic_time
Process.clock_gettime(Process::CLOCK_MONOTONIC, :float_second)
end
def continue_waiting?(deadline)
any_alive? && monotonic_time < deadline
end
def signal_all(signal)
ProcessManagement.signal_processes(@pids, signal)
end
def hard_stop_stuck_pids
ProcessManagement.signal_processes(live_pids, "-KILL")
end
def any_alive?
ProcessManagement.any_alive?(@pids)
end
def all_alive?
ProcessManagement.all_alive?(@pids)
end
def live_pids
ProcessManagement.pids_alive(@pids)
end
end
end
|