summaryrefslogtreecommitdiff
path: root/lib/gitlab/sidekiq_middleware/shutdown.rb
diff options
context:
space:
mode:
Diffstat (limited to 'lib/gitlab/sidekiq_middleware/shutdown.rb')
-rw-r--r--lib/gitlab/sidekiq_middleware/shutdown.rb133
1 files changed, 133 insertions, 0 deletions
diff --git a/lib/gitlab/sidekiq_middleware/shutdown.rb b/lib/gitlab/sidekiq_middleware/shutdown.rb
new file mode 100644
index 00000000000..c2b8d6de66e
--- /dev/null
+++ b/lib/gitlab/sidekiq_middleware/shutdown.rb
@@ -0,0 +1,133 @@
+require 'mutex_m'
+
+module Gitlab
+ module SidekiqMiddleware
+ class Shutdown
+ extend Mutex_m
+
+ # Default the RSS limit to 0, meaning the MemoryKiller is disabled
+ MAX_RSS = (ENV['SIDEKIQ_MEMORY_KILLER_MAX_RSS'] || 0).to_s.to_i
+ # Give Sidekiq 15 minutes of grace time after exceeding the RSS limit
+ GRACE_TIME = (ENV['SIDEKIQ_MEMORY_KILLER_GRACE_TIME'] || 15 * 60).to_s.to_i
+ # Wait 30 seconds for running jobs to finish during graceful shutdown
+ SHUTDOWN_WAIT = (ENV['SIDEKIQ_MEMORY_KILLER_SHUTDOWN_WAIT'] || 30).to_s.to_i
+
+ # This exception can be used to request that the middleware start shutting down Sidekiq
+ WantShutdown = Class.new(StandardError)
+
+ ShutdownWithoutRaise = Class.new(WantShutdown)
+ private_constant :ShutdownWithoutRaise
+
+ # For testing only, to avoid race conditions (?) in Rspec mocks.
+ attr_reader :trace
+
+ # We store the shutdown thread in a class variable to ensure that there
+ # can be only one shutdown thread in the process.
+ def self.create_shutdown_thread
+ mu_synchronize do
+ return unless @shutdown_thread.nil?
+
+ @shutdown_thread = Thread.new { yield }
+ end
+ end
+
+ # For testing only: so we can wait for the shutdown thread to finish.
+ def self.shutdown_thread
+ mu_synchronize { @shutdown_thread }
+ end
+
+ # For testing only: so that we can reset the global state before each test.
+ def self.clear_shutdown_thread
+ mu_synchronize { @shutdown_thread = nil }
+ end
+
+ def initialize
+ @trace = Queue.new if Rails.env.test?
+ end
+
+ def call(worker, job, queue)
+ shutdown_exception = nil
+
+ begin
+ yield
+ check_rss!
+ rescue WantShutdown => ex
+ shutdown_exception = ex
+ end
+
+ return unless shutdown_exception
+
+ self.class.create_shutdown_thread do
+ do_shutdown(worker, job, shutdown_exception)
+ end
+
+ raise shutdown_exception unless shutdown_exception.is_a?(ShutdownWithoutRaise)
+ end
+
+ private
+
+ def do_shutdown(worker, job, shutdown_exception)
+ Sidekiq.logger.warn "Sidekiq worker PID-#{pid} shutting down because of #{shutdown_exception} after job "\
+ "#{worker.class} JID-#{job['jid']}"
+ Sidekiq.logger.warn "Sidekiq worker PID-#{pid} will stop fetching new jobs in #{GRACE_TIME} seconds, and will be shut down #{SHUTDOWN_WAIT} seconds later"
+
+ # Wait `GRACE_TIME` to give the memory intensive job time to finish.
+ # Then, tell Sidekiq to stop fetching new jobs.
+ wait_and_signal(GRACE_TIME, 'SIGTSTP', 'stop fetching new jobs')
+
+ # Wait `SHUTDOWN_WAIT` to give already fetched jobs time to finish.
+ # Then, tell Sidekiq to gracefully shut down by giving jobs a few more
+ # moments to finish, killing and requeuing them if they didn't, and
+ # then terminating itself.
+ wait_and_signal(SHUTDOWN_WAIT, 'SIGTERM', 'gracefully shut down')
+
+ # Wait for Sidekiq to shutdown gracefully, and kill it if it didn't.
+ wait_and_signal(Sidekiq.options[:timeout] + 2, 'SIGKILL', 'die')
+ end
+
+ def check_rss!
+ return unless MAX_RSS > 0
+
+ current_rss = get_rss
+ return unless current_rss > MAX_RSS
+
+ raise ShutdownWithoutRaise.new("current RSS #{current_rss} exceeds maximum RSS #{MAX_RSS}")
+ end
+
+ def get_rss
+ output, status = Gitlab::Popen.popen(%W(ps -o rss= -p #{pid}), Rails.root.to_s)
+ return 0 unless status.zero?
+
+ output.to_i
+ end
+
+ def wait_and_signal(time, signal, explanation)
+ Sidekiq.logger.warn "waiting #{time} seconds before sending Sidekiq worker PID-#{pid} #{signal} (#{explanation})"
+ sleep(time)
+
+ Sidekiq.logger.warn "sending Sidekiq worker PID-#{pid} #{signal} (#{explanation})"
+ kill(signal, pid)
+ end
+
+ def pid
+ Process.pid
+ end
+
+ def sleep(time)
+ if Rails.env.test?
+ @trace << [:sleep, time]
+ else
+ Kernel.sleep(time)
+ end
+ end
+
+ def kill(signal, pid)
+ if Rails.env.test?
+ @trace << [:kill, signal, pid]
+ else
+ Process.kill(signal, pid)
+ end
+ end
+ end
+ end
+end