summaryrefslogtreecommitdiff
path: root/lib/gitlab/sidekiq_middleware/shutdown.rb
blob: 19f3be83bcebc5bf512f130fed6bab1ac8cedbb2 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
# frozen_string_literal: true

require 'mutex_m'

module Gitlab
  module SidekiqMiddleware
    class Shutdown
      extend Mutex_m

      # Default the RSS limit to 0, meaning the MemoryKiller is disabled
      MAX_RSS = (ENV['SIDEKIQ_MEMORY_KILLER_MAX_RSS'] || 0).to_s.to_i
      # Give Sidekiq 15 minutes of grace time after exceeding the RSS limit
      GRACE_TIME = (ENV['SIDEKIQ_MEMORY_KILLER_GRACE_TIME'] || 15 * 60).to_s.to_i
      # Wait 30 seconds for running jobs to finish during graceful shutdown
      SHUTDOWN_WAIT = (ENV['SIDEKIQ_MEMORY_KILLER_SHUTDOWN_WAIT'] || 30).to_s.to_i

      # This exception can be used to request that the middleware start shutting down Sidekiq
      WantShutdown = Class.new(StandardError)

      ShutdownWithoutRaise = Class.new(WantShutdown)
      private_constant :ShutdownWithoutRaise

      # For testing only, to avoid race conditions (?) in Rspec mocks.
      attr_reader :trace

      # We store the shutdown thread in a class variable to ensure that there
      # can be only one shutdown thread in the process.
      def self.create_shutdown_thread
        mu_synchronize do
          break unless @shutdown_thread.nil?

          @shutdown_thread = Thread.new { yield }
        end
      end

      # For testing only: so we can wait for the shutdown thread to finish.
      def self.shutdown_thread
        mu_synchronize { @shutdown_thread }
      end

      # For testing only: so that we can reset the global state before each test.
      def self.clear_shutdown_thread
        mu_synchronize { @shutdown_thread = nil }
      end

      def initialize
        @trace = Queue.new if Rails.env.test?
      end

      def call(worker, job, queue)
        shutdown_exception = nil

        begin
          yield
          check_rss!
        rescue WantShutdown => ex
          shutdown_exception = ex
        end

        return unless shutdown_exception

        self.class.create_shutdown_thread do
          do_shutdown(worker, job, shutdown_exception)
        end

        raise shutdown_exception unless shutdown_exception.is_a?(ShutdownWithoutRaise)
      end

      private

      def do_shutdown(worker, job, shutdown_exception)
        Sidekiq.logger.warn "Sidekiq worker PID-#{pid} shutting down because of #{shutdown_exception} after job "\
          "#{worker.class} JID-#{job['jid']}"
        Sidekiq.logger.warn "Sidekiq worker PID-#{pid} will stop fetching new jobs in #{GRACE_TIME} seconds, and will be shut down #{SHUTDOWN_WAIT} seconds later"

        # Wait `GRACE_TIME` to give the memory intensive job time to finish.
        # Then, tell Sidekiq to stop fetching new jobs.
        wait_and_signal(GRACE_TIME, 'SIGTSTP', 'stop fetching new jobs')

        # Wait `SHUTDOWN_WAIT` to give already fetched jobs time to finish.
        # Then, tell Sidekiq to gracefully shut down by giving jobs a few more
        # moments to finish, killing and requeuing them if they didn't, and
        # then terminating itself.
        wait_and_signal(SHUTDOWN_WAIT, 'SIGTERM', 'gracefully shut down')

        # Wait for Sidekiq to shutdown gracefully, and kill it if it didn't.
        wait_and_signal(Sidekiq.options[:timeout] + 2, 'SIGKILL', 'die')
      end

      def check_rss!
        return unless MAX_RSS > 0

        current_rss = get_rss
        return unless current_rss > MAX_RSS

        raise ShutdownWithoutRaise.new("current RSS #{current_rss} exceeds maximum RSS #{MAX_RSS}")
      end

      def get_rss
        output, status = Gitlab::Popen.popen(%W(ps -o rss= -p #{pid}), Rails.root.to_s)
        return 0 unless status.zero?

        output.to_i
      end

      def wait_and_signal(time, signal, explanation)
        Sidekiq.logger.warn "waiting #{time} seconds before sending Sidekiq worker PID-#{pid} #{signal} (#{explanation})"
        sleep(time)

        Sidekiq.logger.warn "sending Sidekiq worker PID-#{pid} #{signal} (#{explanation})"
        kill(signal, pid)
      end

      def pid
        Process.pid
      end

      def sleep(time)
        if Rails.env.test?
          @trace << [:sleep, time]
        else
          Kernel.sleep(time)
        end
      end

      def kill(signal, pid)
        if Rails.env.test?
          @trace << [:kill, signal, pid]
        else
          Process.kill(signal, pid)
        end
      end
    end
  end
end