1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
|
require 'mutex_m'
module Gitlab
module SidekiqMiddleware
class Shutdown
extend Mutex_m
# Default the RSS limit to 0, meaning the MemoryKiller is disabled
MAX_RSS = (ENV['SIDEKIQ_MEMORY_KILLER_MAX_RSS'] || 0).to_s.to_i
# Give Sidekiq 15 minutes of grace time after exceeding the RSS limit
GRACE_TIME = (ENV['SIDEKIQ_MEMORY_KILLER_GRACE_TIME'] || 15 * 60).to_s.to_i
# Wait 30 seconds for running jobs to finish during graceful shutdown
SHUTDOWN_WAIT = (ENV['SIDEKIQ_MEMORY_KILLER_SHUTDOWN_WAIT'] || 30).to_s.to_i
# Wait additional time for Sidekiq to finish terminatring
# and for subprocesses to terminate
ADDITIONAL_WAIT = 2
# This exception can be used to request that the middleware start shutting down Sidekiq
WantShutdown = Class.new(StandardError)
ShutdownWithoutRaise = Class.new(WantShutdown)
private_constant :ShutdownWithoutRaise
# For testing only, to avoid race conditions (?) in Rspec mocks.
attr_reader :trace
# We store the shutdown thread in a class variable to ensure that there
# can be only one shutdown thread in the process.
def self.create_shutdown_thread
mu_synchronize do
break unless @shutdown_thread.nil?
@shutdown_thread = Thread.new { yield }
end
end
# For testing only: so we can wait for the shutdown thread to finish.
def self.shutdown_thread
mu_synchronize { @shutdown_thread }
end
# For testing only: so that we can reset the global state before each test.
def self.clear_shutdown_thread
mu_synchronize { @shutdown_thread = nil }
end
def initialize
@trace = Queue.new if Rails.env.test?
end
def call(worker, job, queue)
shutdown_exception = nil
begin
yield
check_rss!
rescue WantShutdown => ex
shutdown_exception = ex
end
return unless shutdown_exception
self.class.create_shutdown_thread do
do_shutdown(worker, job, shutdown_exception)
end
raise shutdown_exception unless shutdown_exception.is_a?(ShutdownWithoutRaise)
end
private
def do_shutdown(worker, job, shutdown_exception)
Sidekiq.logger.warn "Sidekiq worker PID-#{pid} shutting down because of #{shutdown_exception} after job "\
"#{worker.class} JID-#{job['jid']}"
Sidekiq.logger.warn "Sidekiq worker PID-#{pid} will stop fetching new jobs in #{GRACE_TIME} seconds, and will be shut down #{SHUTDOWN_WAIT} seconds later"
# Wait `GRACE_TIME` to give the memory intensive job time to finish.
# Then, tell Sidekiq to stop fetching new jobs.
wait_and_signal(GRACE_TIME, 'SIGTSTP', 'stop fetching new jobs')
# Wait `SHUTDOWN_WAIT` to give already fetched jobs time to finish.
# Then, tell Sidekiq to gracefully shut down by giving jobs a few more
# moments to finish, killing and requeuing them if they didn't, and
# then terminating itself.
wait_and_signal(SHUTDOWN_WAIT, 'SIGTERM', 'gracefully shut down')
# Wait for Sidekiq to shutdown gracefully
# If it didn't then attempt to clean up any subprocesses
subprocesses_warning = "sending SIGINT to Sidekiq group PID-#{pid} to kill subprocesses"
warn_and_wait(Sidekiq.options[:timeout], subprocesses_warning) do
kill('SIGINT', -pid)
end
# Kill Sidekiq if it was unable to shutdown gracefully
wait_and_signal(ADDITIONAL_WAIT, 'SIGKILL', 'die')
end
def check_rss!
return unless MAX_RSS > 0
current_rss = get_rss
return unless current_rss > MAX_RSS
raise ShutdownWithoutRaise.new("current RSS #{current_rss} exceeds maximum RSS #{MAX_RSS}")
end
def get_rss
output, status = Gitlab::Popen.popen(%W(ps -o rss= -p #{pid}), Rails.root.to_s)
return 0 unless status.zero?
output.to_i
end
def wait_and_signal(time, signal, explanation)
warning = "sending Sidekiq worker PID-#{pid} #{signal} (#{explanation})"
warn_and_wait(time, warning) do
kill(signal, pid)
end
end
def warn_and_wait(time, warning)
Sidekiq.logger.warn "waiting #{time} seconds before #{warning}"
sleep(time)
Sidekiq.logger.warn(warning)
yield
end
def pid
Process.pid
end
def sleep(time)
if Rails.env.test?
@trace << [:sleep, time]
else
Kernel.sleep(time)
end
end
def kill(signal, pid)
if Rails.env.test?
@trace << [:kill, signal, pid]
else
Process.kill(signal, pid)
end
end
end
end
end
|