summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKamil TrzciƄski <ayufan@ayufan.eu>2019-08-14 17:56:37 +0200
committerQingyu Zhao <qzhao@gitlab.com>2019-08-21 18:50:46 +1000
commit75e2302d0126c4bc8ea215ffb4e72612d44e73bb (patch)
tree9b7bb2eb248080aab20cd8de15cf73ceb8b97dd8
parentca622a3e13cf88d94c6b3c98554e9782d37d4ad5 (diff)
downloadgitlab-ce-75e2302d0126c4bc8ea215ffb4e72612d44e73bb.tar.gz
Allow to interrupt running jobs
This adds a middleware to track all threads for running jobs. This makes sidekiq to watch for redis-delivered notifications. This makes be able to send notification to interrupt running sidekiq jobs. This does not take into account any native code, as `Thread.raise` generates exception once the control gets back to Ruby. The separate measure should be taken to interrupt gRPC, shellouts, or anything else that escapes Ruby.
-rw-r--r--config/initializers/sidekiq.rb3
-rw-r--r--lib/gitlab/sidekiq_middleware/jobs_threads.rb49
-rw-r--r--lib/gitlab/sidekiq_status/monitor.rb46
-rw-r--r--spec/lib/gitlab/sidekiq_middleware/jobs_threads_spec.rb83
4 files changed, 181 insertions, 0 deletions
diff --git a/config/initializers/sidekiq.rb b/config/initializers/sidekiq.rb
index 7217f098fd9..b05d4342e23 100644
--- a/config/initializers/sidekiq.rb
+++ b/config/initializers/sidekiq.rb
@@ -33,6 +33,7 @@ Sidekiq.configure_server do |config|
config.redis = queues_config_hash
config.server_middleware do |chain|
+ chain.add Gitlab::SidekiqMiddleware::JobsThreads unless ENV['DISABLE_SIDEKIQ_INTERRUPT_RUNNING_JOBS']
chain.add Gitlab::SidekiqMiddleware::Metrics if Settings.monitoring.sidekiq_exporter
chain.add Gitlab::SidekiqMiddleware::ArgumentsLogger if ENV['SIDEKIQ_LOG_ARGUMENTS'] && !enable_json_logs
chain.add Gitlab::SidekiqMiddleware::MemoryKiller if ENV['SIDEKIQ_MEMORY_KILLER_MAX_RSS']
@@ -57,6 +58,8 @@ Sidekiq.configure_server do |config|
# Clear any connections that might have been obtained before starting
# Sidekiq (e.g. in an initializer).
ActiveRecord::Base.clear_all_connections!
+
+ Gitlab::SidekiqStatus::Monitor.instance.start unless ENV['DISABLE_SIDEKIQ_INTERRUPT_RUNNING_JOBS']
end
if enable_reliable_fetch?
diff --git a/lib/gitlab/sidekiq_middleware/jobs_threads.rb b/lib/gitlab/sidekiq_middleware/jobs_threads.rb
new file mode 100644
index 00000000000..d0603bcee2d
--- /dev/null
+++ b/lib/gitlab/sidekiq_middleware/jobs_threads.rb
@@ -0,0 +1,49 @@
+# frozen_string_literal: true
+
+module Gitlab
+ module SidekiqMiddleware
+ class JobsThreads
+ @@jobs = {} # rubocop:disable Style/ClassVars
+ MUTEX = Mutex.new
+
+ def call(worker, job, queue)
+ jid = job['jid']
+
+ MUTEX.synchronize do
+ @@jobs[jid] = Thread.current
+ end
+
+ return if self.class.cancelled?(jid)
+
+ yield
+ ensure
+ MUTEX.synchronize do
+ @@jobs.delete(jid)
+ end
+ end
+
+ def self.interrupt(jid)
+ MUTEX.synchronize do
+ thread = @@jobs[jid]
+ break unless thread
+
+ thread.raise(Interrupt)
+ thread
+ end
+ end
+
+ def self.cancelled?(jid)
+ Sidekiq.redis {|c| c.exists("cancelled-#{jid}") }
+ end
+
+ def self.mark_job_as_cancelled(jid)
+ Sidekiq.redis {|c| c.setex("cancelled-#{jid}", 86400, 1) }
+ "Marked job as cancelled(if Sidekiq retry within 24 hours, the job will be skipped as `processed`). Jid: #{jid}"
+ end
+
+ def self.jobs
+ @@jobs
+ end
+ end
+ end
+end
diff --git a/lib/gitlab/sidekiq_status/monitor.rb b/lib/gitlab/sidekiq_status/monitor.rb
new file mode 100644
index 00000000000..3fd9f02b166
--- /dev/null
+++ b/lib/gitlab/sidekiq_status/monitor.rb
@@ -0,0 +1,46 @@
+# frozen_string_literal: true
+
+module Gitlab
+ module SidekiqStatus
+ class Monitor < Daemon
+ include ::Gitlab::Utils::StrongMemoize
+
+ NOTIFICATION_CHANNEL = 'sidekiq:cancel:notifications'.freeze
+
+ def start_working
+ Sidekiq.logger.info "Watching sidekiq monitor"
+
+ ::Gitlab::Redis::SharedState.with do |redis|
+ redis.subscribe(NOTIFICATION_CHANNEL) do |on|
+ on.message do |channel, message|
+ Sidekiq.logger.info "Received #{message} on #{channel}..."
+ execute_job_cancel(message)
+ end
+ end
+ end
+ end
+
+ def self.cancel_job(jid)
+ Gitlab::Redis::SharedState.with do |redis|
+ redis.publish(NOTIFICATION_CHANNEL, jid)
+ "Notification sent. Job should be cancelled soon. Check log to confirm. Jid: #{jid}"
+ end
+ end
+
+ private
+
+ def execute_job_cancel(jid)
+ Gitlab::SidekiqMiddleware::JobsThreads.mark_job_as_cancelled(jid)
+
+ thread = Gitlab::SidekiqMiddleware::JobsThreads
+ .interrupt(jid)
+
+ if thread
+ Sidekiq.logger.info "Interrupted thread: #{thread} for #{jid}."
+ else
+ Sidekiq.logger.info "Did not find thread for #{jid}."
+ end
+ end
+ end
+ end
+end
diff --git a/spec/lib/gitlab/sidekiq_middleware/jobs_threads_spec.rb b/spec/lib/gitlab/sidekiq_middleware/jobs_threads_spec.rb
new file mode 100644
index 00000000000..58cae3e42e0
--- /dev/null
+++ b/spec/lib/gitlab/sidekiq_middleware/jobs_threads_spec.rb
@@ -0,0 +1,83 @@
+require 'spec_helper'
+
+describe Gitlab::SidekiqMiddleware::JobsThreads do
+ subject { described_class.new }
+
+ let(:worker) { double(:worker, class: Chaos::SleepWorker) }
+ let(:jid) { '581f90fbd2f24deabcbde2f9' }
+ let(:job) { { 'jid' => jid } }
+ let(:jid_thread) { '684f90fbd2f24deabcbde2f9' }
+ let(:job_thread) { { 'jid' => jid_thread } }
+ let(:queue) { 'test_queue' }
+ let(:mark_job_as_cancelled) { Sidekiq.redis {|c| c.setex("cancelled-#{jid}", 2, 1) } }
+
+ def run_job
+ subject.call(worker, job, queue) do
+ sleep 2
+ "mock return from yield"
+ end
+ end
+
+ def run_job_thread
+ Thread.new do
+ subject.call(worker, job_thread, queue) do
+ sleep 3
+ "mock return from yield"
+ end
+ end
+ end
+
+ describe '.call' do
+ context 'by default' do
+ it 'return from yield' do
+ expect(run_job).to eq("mock return from yield")
+ end
+ end
+
+ context 'when job is marked as cancelled' do
+ before do
+ mark_job_as_cancelled
+ end
+
+ it 'return directly' do
+ expect(run_job).to be_nil
+ end
+ end
+ end
+
+ describe '.self.interrupt' do
+ before do
+ run_job_thread
+ sleep 1
+ end
+
+ it 'interrupt the job with correct jid' do
+ expect(described_class.jobs[jid_thread]).to receive(:raise).with(Interrupt)
+ expect(described_class.interrupt jid_thread).to eq(described_class.jobs[jid_thread])
+ end
+
+ it 'do nothing with wrong jid' do
+ expect(described_class.jobs[jid_thread]).not_to receive(:raise)
+ expect(described_class.interrupt 'wrong_jid').to be_nil
+ end
+ end
+
+ describe '.self.cancelled?' do
+ it 'return true when job is marked as cancelled' do
+ mark_job_as_cancelled
+ expect(described_class.cancelled? jid).to be true
+ end
+
+ it 'return false when job is not marked as cancelled' do
+ expect(described_class.cancelled? 'non-exists-jid').to be false
+ end
+ end
+
+ describe '.self.mark_job_as_cancelled' do
+ it 'set Redis key' do
+ described_class.mark_job_as_cancelled('jid_123')
+
+ expect(described_class.cancelled? 'jid_123').to be true
+ end
+ end
+end