summaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
authorKamil TrzciƄski <ayufan@ayufan.eu>2019-08-14 17:56:37 +0200
committerQingyu Zhao <qzhao@gitlab.com>2019-08-21 18:50:46 +1000
commit75e2302d0126c4bc8ea215ffb4e72612d44e73bb (patch)
tree9b7bb2eb248080aab20cd8de15cf73ceb8b97dd8 /lib
parentca622a3e13cf88d94c6b3c98554e9782d37d4ad5 (diff)
downloadgitlab-ce-75e2302d0126c4bc8ea215ffb4e72612d44e73bb.tar.gz
Allow to interrupt running jobs
This adds a middleware to track all threads for running jobs. This makes sidekiq to watch for redis-delivered notifications. This makes be able to send notification to interrupt running sidekiq jobs. This does not take into account any native code, as `Thread.raise` generates exception once the control gets back to Ruby. The separate measure should be taken to interrupt gRPC, shellouts, or anything else that escapes Ruby.
Diffstat (limited to 'lib')
-rw-r--r--lib/gitlab/sidekiq_middleware/jobs_threads.rb49
-rw-r--r--lib/gitlab/sidekiq_status/monitor.rb46
2 files changed, 95 insertions, 0 deletions
diff --git a/lib/gitlab/sidekiq_middleware/jobs_threads.rb b/lib/gitlab/sidekiq_middleware/jobs_threads.rb
new file mode 100644
index 00000000000..d0603bcee2d
--- /dev/null
+++ b/lib/gitlab/sidekiq_middleware/jobs_threads.rb
@@ -0,0 +1,49 @@
+# frozen_string_literal: true
+
+module Gitlab
+ module SidekiqMiddleware
+ class JobsThreads
+ @@jobs = {} # rubocop:disable Style/ClassVars
+ MUTEX = Mutex.new
+
+ def call(worker, job, queue)
+ jid = job['jid']
+
+ MUTEX.synchronize do
+ @@jobs[jid] = Thread.current
+ end
+
+ return if self.class.cancelled?(jid)
+
+ yield
+ ensure
+ MUTEX.synchronize do
+ @@jobs.delete(jid)
+ end
+ end
+
+ def self.interrupt(jid)
+ MUTEX.synchronize do
+ thread = @@jobs[jid]
+ break unless thread
+
+ thread.raise(Interrupt)
+ thread
+ end
+ end
+
+ def self.cancelled?(jid)
+ Sidekiq.redis {|c| c.exists("cancelled-#{jid}") }
+ end
+
+ def self.mark_job_as_cancelled(jid)
+ Sidekiq.redis {|c| c.setex("cancelled-#{jid}", 86400, 1) }
+ "Marked job as cancelled(if Sidekiq retry within 24 hours, the job will be skipped as `processed`). Jid: #{jid}"
+ end
+
+ def self.jobs
+ @@jobs
+ end
+ end
+ end
+end
diff --git a/lib/gitlab/sidekiq_status/monitor.rb b/lib/gitlab/sidekiq_status/monitor.rb
new file mode 100644
index 00000000000..3fd9f02b166
--- /dev/null
+++ b/lib/gitlab/sidekiq_status/monitor.rb
@@ -0,0 +1,46 @@
+# frozen_string_literal: true
+
+module Gitlab
+ module SidekiqStatus
+ class Monitor < Daemon
+ include ::Gitlab::Utils::StrongMemoize
+
+ NOTIFICATION_CHANNEL = 'sidekiq:cancel:notifications'.freeze
+
+ def start_working
+ Sidekiq.logger.info "Watching sidekiq monitor"
+
+ ::Gitlab::Redis::SharedState.with do |redis|
+ redis.subscribe(NOTIFICATION_CHANNEL) do |on|
+ on.message do |channel, message|
+ Sidekiq.logger.info "Received #{message} on #{channel}..."
+ execute_job_cancel(message)
+ end
+ end
+ end
+ end
+
+ def self.cancel_job(jid)
+ Gitlab::Redis::SharedState.with do |redis|
+ redis.publish(NOTIFICATION_CHANNEL, jid)
+ "Notification sent. Job should be cancelled soon. Check log to confirm. Jid: #{jid}"
+ end
+ end
+
+ private
+
+ def execute_job_cancel(jid)
+ Gitlab::SidekiqMiddleware::JobsThreads.mark_job_as_cancelled(jid)
+
+ thread = Gitlab::SidekiqMiddleware::JobsThreads
+ .interrupt(jid)
+
+ if thread
+ Sidekiq.logger.info "Interrupted thread: #{thread} for #{jid}."
+ else
+ Sidekiq.logger.info "Did not find thread for #{jid}."
+ end
+ end
+ end
+ end
+end