diff options
author | Stan Hu <stanhu@gmail.com> | 2019-08-21 18:37:36 +0000 |
---|---|---|
committer | Stan Hu <stanhu@gmail.com> | 2019-08-21 18:37:36 +0000 |
commit | 9a0c1f64f540c1f07b2742a4c1c3ef4bee749bfd (patch) | |
tree | f85db52685bc95983c23d470b3443f8ac79fa848 /lib | |
parent | 5fb18d5766a198882e35dbbb332c8eee85320a61 (diff) | |
parent | 8d17c4dae6b4662dddffe9e2ddca8100e8cd3d0b (diff) | |
download | gitlab-ce-9a0c1f64f540c1f07b2742a4c1c3ef4bee749bfd.tar.gz |
Merge branch 'sidekiq-interrupt-running-jobs' into 'master'
Allow to interrupt running sidekiq jobs
See merge request gitlab-org/gitlab-ce!31818
Diffstat (limited to 'lib')
-rw-r--r-- | lib/gitlab/daemon.rb | 5 | ||||
-rw-r--r-- | lib/gitlab/sidekiq_middleware/monitor.rb | 16 | ||||
-rw-r--r-- | lib/gitlab/sidekiq_monitor.rb | 182 |
3 files changed, 202 insertions, 1 deletions
diff --git a/lib/gitlab/daemon.rb b/lib/gitlab/daemon.rb index 6d5fc4219fb..2f4ae010e74 100644 --- a/lib/gitlab/daemon.rb +++ b/lib/gitlab/daemon.rb @@ -46,7 +46,10 @@ module Gitlab if thread thread.wakeup if thread.alive? - thread.join unless Thread.current == thread + begin + thread.join unless Thread.current == thread + rescue Exception # rubocop:disable Lint/RescueException + end @thread = nil end end diff --git a/lib/gitlab/sidekiq_middleware/monitor.rb b/lib/gitlab/sidekiq_middleware/monitor.rb new file mode 100644 index 00000000000..0d88fe760d3 --- /dev/null +++ b/lib/gitlab/sidekiq_middleware/monitor.rb @@ -0,0 +1,16 @@ +# frozen_string_literal: true + +module Gitlab + module SidekiqMiddleware + class Monitor + def call(worker, job, queue) + Gitlab::SidekiqMonitor.instance.within_job(job['jid'], queue) do + yield + end + rescue Gitlab::SidekiqMonitor::CancelledError + # ignore retries + raise Sidekiq::JobRetry::Skip + end + end + end +end diff --git a/lib/gitlab/sidekiq_monitor.rb b/lib/gitlab/sidekiq_monitor.rb new file mode 100644 index 00000000000..9842f1f53f7 --- /dev/null +++ b/lib/gitlab/sidekiq_monitor.rb @@ -0,0 +1,182 @@ +# frozen_string_literal: true + +module Gitlab + class SidekiqMonitor < Daemon + include ::Gitlab::Utils::StrongMemoize + + NOTIFICATION_CHANNEL = 'sidekiq:cancel:notifications'.freeze + CANCEL_DEADLINE = 24.hours.seconds + RECONNECT_TIME = 3.seconds + + # We use exception derived from `Exception` + # to consider this as an very low-level exception + # that should not be caught by application + CancelledError = Class.new(Exception) # rubocop:disable Lint/InheritException + + attr_reader :jobs_thread + attr_reader :jobs_mutex + + def initialize + super + + @jobs_thread = {} + @jobs_mutex = Mutex.new + end + + def within_job(jid, queue) + jobs_mutex.synchronize do + jobs_thread[jid] = Thread.current + end + + if cancelled?(jid) + Sidekiq.logger.warn( + class: self.class.to_s, + action: 'run', + queue: queue, + jid: jid, + canceled: true + ) + raise CancelledError + end + + yield + ensure + jobs_mutex.synchronize do + jobs_thread.delete(jid) + end + end + + def self.cancel_job(jid) + payload = { + action: 'cancel', + jid: jid + }.to_json + + ::Gitlab::Redis::SharedState.with do |redis| + redis.setex(cancel_job_key(jid), CANCEL_DEADLINE, 1) + redis.publish(NOTIFICATION_CHANNEL, payload) + end + end + + private + + def start_working + Sidekiq.logger.info( + class: self.class.to_s, + action: 'start', + message: 'Starting Monitor Daemon' + ) + + while enabled? + process_messages + sleep(RECONNECT_TIME) + end + + ensure + Sidekiq.logger.warn( + class: self.class.to_s, + action: 'stop', + message: 'Stopping Monitor Daemon' + ) + end + + def stop_working + thread.raise(Interrupt) if thread.alive? + end + + def process_messages + ::Gitlab::Redis::SharedState.with do |redis| + redis.subscribe(NOTIFICATION_CHANNEL) do |on| + on.message do |channel, message| + process_message(message) + end + end + end + rescue Exception => e # rubocop:disable Lint/RescueException + Sidekiq.logger.warn( + class: self.class.to_s, + action: 'exception', + message: e.message + ) + + # we re-raise system exceptions + raise e unless e.is_a?(StandardError) + end + + def process_message(message) + Sidekiq.logger.info( + class: self.class.to_s, + channel: NOTIFICATION_CHANNEL, + message: 'Received payload on channel', + payload: message + ) + + message = safe_parse(message) + return unless message + + case message['action'] + when 'cancel' + process_job_cancel(message['jid']) + else + # unknown message + end + end + + def safe_parse(message) + JSON.parse(message) + rescue JSON::ParserError + end + + def process_job_cancel(jid) + return unless jid + + # try to find thread without lock + return unless find_thread_unsafe(jid) + + Thread.new do + # try to find a thread, but with guaranteed + # that handle for thread corresponds to actually + # running job + find_thread_with_lock(jid) do |thread| + Sidekiq.logger.warn( + class: self.class.to_s, + action: 'cancel', + message: 'Canceling thread with CancelledError', + jid: jid, + thread_id: thread.object_id + ) + + thread&.raise(CancelledError) + end + end + end + + # This method needs to be thread-safe + # This is why it passes thread in block, + # to ensure that we do process this thread + def find_thread_unsafe(jid) + jobs_thread[jid] + end + + def find_thread_with_lock(jid) + # don't try to lock if we cannot find the thread + return unless find_thread_unsafe(jid) + + jobs_mutex.synchronize do + find_thread_unsafe(jid).tap do |thread| + yield(thread) if thread + end + end + end + + def cancelled?(jid) + ::Gitlab::Redis::SharedState.with do |redis| + redis.exists(self.class.cancel_job_key(jid)) + end + end + + def self.cancel_job_key(jid) + "sidekiq:cancel:#{jid}" + end + end +end |