summaryrefslogtreecommitdiff
path: root/lib/gitlab/sidekiq_monitor.rb
diff options
context:
space:
mode:
Diffstat (limited to 'lib/gitlab/sidekiq_monitor.rb')
-rw-r--r--lib/gitlab/sidekiq_monitor.rb182
1 files changed, 182 insertions, 0 deletions
diff --git a/lib/gitlab/sidekiq_monitor.rb b/lib/gitlab/sidekiq_monitor.rb
new file mode 100644
index 00000000000..9842f1f53f7
--- /dev/null
+++ b/lib/gitlab/sidekiq_monitor.rb
@@ -0,0 +1,182 @@
+# frozen_string_literal: true
+
+module Gitlab
+ class SidekiqMonitor < Daemon
+ include ::Gitlab::Utils::StrongMemoize
+
+ NOTIFICATION_CHANNEL = 'sidekiq:cancel:notifications'.freeze
+ CANCEL_DEADLINE = 24.hours.seconds
+ RECONNECT_TIME = 3.seconds
+
+ # We use exception derived from `Exception`
+ # to consider this as an very low-level exception
+ # that should not be caught by application
+ CancelledError = Class.new(Exception) # rubocop:disable Lint/InheritException
+
+ attr_reader :jobs_thread
+ attr_reader :jobs_mutex
+
+ def initialize
+ super
+
+ @jobs_thread = {}
+ @jobs_mutex = Mutex.new
+ end
+
+ def within_job(jid, queue)
+ jobs_mutex.synchronize do
+ jobs_thread[jid] = Thread.current
+ end
+
+ if cancelled?(jid)
+ Sidekiq.logger.warn(
+ class: self.class.to_s,
+ action: 'run',
+ queue: queue,
+ jid: jid,
+ canceled: true
+ )
+ raise CancelledError
+ end
+
+ yield
+ ensure
+ jobs_mutex.synchronize do
+ jobs_thread.delete(jid)
+ end
+ end
+
+ def self.cancel_job(jid)
+ payload = {
+ action: 'cancel',
+ jid: jid
+ }.to_json
+
+ ::Gitlab::Redis::SharedState.with do |redis|
+ redis.setex(cancel_job_key(jid), CANCEL_DEADLINE, 1)
+ redis.publish(NOTIFICATION_CHANNEL, payload)
+ end
+ end
+
+ private
+
+ def start_working
+ Sidekiq.logger.info(
+ class: self.class.to_s,
+ action: 'start',
+ message: 'Starting Monitor Daemon'
+ )
+
+ while enabled?
+ process_messages
+ sleep(RECONNECT_TIME)
+ end
+
+ ensure
+ Sidekiq.logger.warn(
+ class: self.class.to_s,
+ action: 'stop',
+ message: 'Stopping Monitor Daemon'
+ )
+ end
+
+ def stop_working
+ thread.raise(Interrupt) if thread.alive?
+ end
+
+ def process_messages
+ ::Gitlab::Redis::SharedState.with do |redis|
+ redis.subscribe(NOTIFICATION_CHANNEL) do |on|
+ on.message do |channel, message|
+ process_message(message)
+ end
+ end
+ end
+ rescue Exception => e # rubocop:disable Lint/RescueException
+ Sidekiq.logger.warn(
+ class: self.class.to_s,
+ action: 'exception',
+ message: e.message
+ )
+
+ # we re-raise system exceptions
+ raise e unless e.is_a?(StandardError)
+ end
+
+ def process_message(message)
+ Sidekiq.logger.info(
+ class: self.class.to_s,
+ channel: NOTIFICATION_CHANNEL,
+ message: 'Received payload on channel',
+ payload: message
+ )
+
+ message = safe_parse(message)
+ return unless message
+
+ case message['action']
+ when 'cancel'
+ process_job_cancel(message['jid'])
+ else
+ # unknown message
+ end
+ end
+
+ def safe_parse(message)
+ JSON.parse(message)
+ rescue JSON::ParserError
+ end
+
+ def process_job_cancel(jid)
+ return unless jid
+
+ # try to find thread without lock
+ return unless find_thread_unsafe(jid)
+
+ Thread.new do
+ # try to find a thread, but with guaranteed
+ # that handle for thread corresponds to actually
+ # running job
+ find_thread_with_lock(jid) do |thread|
+ Sidekiq.logger.warn(
+ class: self.class.to_s,
+ action: 'cancel',
+ message: 'Canceling thread with CancelledError',
+ jid: jid,
+ thread_id: thread.object_id
+ )
+
+ thread&.raise(CancelledError)
+ end
+ end
+ end
+
+ # This method needs to be thread-safe
+ # This is why it passes thread in block,
+ # to ensure that we do process this thread
+ def find_thread_unsafe(jid)
+ jobs_thread[jid]
+ end
+
+ def find_thread_with_lock(jid)
+ # don't try to lock if we cannot find the thread
+ return unless find_thread_unsafe(jid)
+
+ jobs_mutex.synchronize do
+ find_thread_unsafe(jid).tap do |thread|
+ yield(thread) if thread
+ end
+ end
+ end
+
+ def cancelled?(jid)
+ ::Gitlab::Redis::SharedState.with do |redis|
+ redis.exists(self.class.cancel_job_key(jid))
+ end
+ end
+
+ def self.cancel_job_key(jid)
+ "sidekiq:cancel:#{jid}"
+ end
+ end
+end