summaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
authorStan Hu <stanhu@gmail.com>2019-08-21 18:37:36 +0000
committerStan Hu <stanhu@gmail.com>2019-08-21 18:37:36 +0000
commit9a0c1f64f540c1f07b2742a4c1c3ef4bee749bfd (patch)
treef85db52685bc95983c23d470b3443f8ac79fa848 /lib
parent5fb18d5766a198882e35dbbb332c8eee85320a61 (diff)
parent8d17c4dae6b4662dddffe9e2ddca8100e8cd3d0b (diff)
downloadgitlab-ce-9a0c1f64f540c1f07b2742a4c1c3ef4bee749bfd.tar.gz
Merge branch 'sidekiq-interrupt-running-jobs' into 'master'
Allow to interrupt running sidekiq jobs See merge request gitlab-org/gitlab-ce!31818
Diffstat (limited to 'lib')
-rw-r--r--lib/gitlab/daemon.rb5
-rw-r--r--lib/gitlab/sidekiq_middleware/monitor.rb16
-rw-r--r--lib/gitlab/sidekiq_monitor.rb182
3 files changed, 202 insertions, 1 deletions
diff --git a/lib/gitlab/daemon.rb b/lib/gitlab/daemon.rb
index 6d5fc4219fb..2f4ae010e74 100644
--- a/lib/gitlab/daemon.rb
+++ b/lib/gitlab/daemon.rb
@@ -46,7 +46,10 @@ module Gitlab
if thread
thread.wakeup if thread.alive?
- thread.join unless Thread.current == thread
+ begin
+ thread.join unless Thread.current == thread
+ rescue Exception # rubocop:disable Lint/RescueException
+ end
@thread = nil
end
end
diff --git a/lib/gitlab/sidekiq_middleware/monitor.rb b/lib/gitlab/sidekiq_middleware/monitor.rb
new file mode 100644
index 00000000000..0d88fe760d3
--- /dev/null
+++ b/lib/gitlab/sidekiq_middleware/monitor.rb
@@ -0,0 +1,16 @@
+# frozen_string_literal: true
+
+module Gitlab
+ module SidekiqMiddleware
+ class Monitor
+ def call(worker, job, queue)
+ Gitlab::SidekiqMonitor.instance.within_job(job['jid'], queue) do
+ yield
+ end
+ rescue Gitlab::SidekiqMonitor::CancelledError
+ # ignore retries
+ raise Sidekiq::JobRetry::Skip
+ end
+ end
+ end
+end
diff --git a/lib/gitlab/sidekiq_monitor.rb b/lib/gitlab/sidekiq_monitor.rb
new file mode 100644
index 00000000000..9842f1f53f7
--- /dev/null
+++ b/lib/gitlab/sidekiq_monitor.rb
@@ -0,0 +1,182 @@
+# frozen_string_literal: true
+
+module Gitlab
+ class SidekiqMonitor < Daemon
+ include ::Gitlab::Utils::StrongMemoize
+
+ NOTIFICATION_CHANNEL = 'sidekiq:cancel:notifications'.freeze
+ CANCEL_DEADLINE = 24.hours.seconds
+ RECONNECT_TIME = 3.seconds
+
+ # We use exception derived from `Exception`
+ # to consider this as an very low-level exception
+ # that should not be caught by application
+ CancelledError = Class.new(Exception) # rubocop:disable Lint/InheritException
+
+ attr_reader :jobs_thread
+ attr_reader :jobs_mutex
+
+ def initialize
+ super
+
+ @jobs_thread = {}
+ @jobs_mutex = Mutex.new
+ end
+
+ def within_job(jid, queue)
+ jobs_mutex.synchronize do
+ jobs_thread[jid] = Thread.current
+ end
+
+ if cancelled?(jid)
+ Sidekiq.logger.warn(
+ class: self.class.to_s,
+ action: 'run',
+ queue: queue,
+ jid: jid,
+ canceled: true
+ )
+ raise CancelledError
+ end
+
+ yield
+ ensure
+ jobs_mutex.synchronize do
+ jobs_thread.delete(jid)
+ end
+ end
+
+ def self.cancel_job(jid)
+ payload = {
+ action: 'cancel',
+ jid: jid
+ }.to_json
+
+ ::Gitlab::Redis::SharedState.with do |redis|
+ redis.setex(cancel_job_key(jid), CANCEL_DEADLINE, 1)
+ redis.publish(NOTIFICATION_CHANNEL, payload)
+ end
+ end
+
+ private
+
+ def start_working
+ Sidekiq.logger.info(
+ class: self.class.to_s,
+ action: 'start',
+ message: 'Starting Monitor Daemon'
+ )
+
+ while enabled?
+ process_messages
+ sleep(RECONNECT_TIME)
+ end
+
+ ensure
+ Sidekiq.logger.warn(
+ class: self.class.to_s,
+ action: 'stop',
+ message: 'Stopping Monitor Daemon'
+ )
+ end
+
+ def stop_working
+ thread.raise(Interrupt) if thread.alive?
+ end
+
+ def process_messages
+ ::Gitlab::Redis::SharedState.with do |redis|
+ redis.subscribe(NOTIFICATION_CHANNEL) do |on|
+ on.message do |channel, message|
+ process_message(message)
+ end
+ end
+ end
+ rescue Exception => e # rubocop:disable Lint/RescueException
+ Sidekiq.logger.warn(
+ class: self.class.to_s,
+ action: 'exception',
+ message: e.message
+ )
+
+ # we re-raise system exceptions
+ raise e unless e.is_a?(StandardError)
+ end
+
+ def process_message(message)
+ Sidekiq.logger.info(
+ class: self.class.to_s,
+ channel: NOTIFICATION_CHANNEL,
+ message: 'Received payload on channel',
+ payload: message
+ )
+
+ message = safe_parse(message)
+ return unless message
+
+ case message['action']
+ when 'cancel'
+ process_job_cancel(message['jid'])
+ else
+ # unknown message
+ end
+ end
+
+ def safe_parse(message)
+ JSON.parse(message)
+ rescue JSON::ParserError
+ end
+
+ def process_job_cancel(jid)
+ return unless jid
+
+ # try to find thread without lock
+ return unless find_thread_unsafe(jid)
+
+ Thread.new do
+ # try to find a thread, but with guaranteed
+ # that handle for thread corresponds to actually
+ # running job
+ find_thread_with_lock(jid) do |thread|
+ Sidekiq.logger.warn(
+ class: self.class.to_s,
+ action: 'cancel',
+ message: 'Canceling thread with CancelledError',
+ jid: jid,
+ thread_id: thread.object_id
+ )
+
+ thread&.raise(CancelledError)
+ end
+ end
+ end
+
+ # This method needs to be thread-safe
+ # This is why it passes thread in block,
+ # to ensure that we do process this thread
+ def find_thread_unsafe(jid)
+ jobs_thread[jid]
+ end
+
+ def find_thread_with_lock(jid)
+ # don't try to lock if we cannot find the thread
+ return unless find_thread_unsafe(jid)
+
+ jobs_mutex.synchronize do
+ find_thread_unsafe(jid).tap do |thread|
+ yield(thread) if thread
+ end
+ end
+ end
+
+ def cancelled?(jid)
+ ::Gitlab::Redis::SharedState.with do |redis|
+ redis.exists(self.class.cancel_job_key(jid))
+ end
+ end
+
+ def self.cancel_job_key(jid)
+ "sidekiq:cancel:#{jid}"
+ end
+ end
+end