summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorStan Hu <stanhu@gmail.com>2019-08-21 18:37:36 +0000
committerStan Hu <stanhu@gmail.com>2019-08-21 18:37:36 +0000
commit9a0c1f64f540c1f07b2742a4c1c3ef4bee749bfd (patch)
treef85db52685bc95983c23d470b3443f8ac79fa848
parent5fb18d5766a198882e35dbbb332c8eee85320a61 (diff)
parent8d17c4dae6b4662dddffe9e2ddca8100e8cd3d0b (diff)
downloadgitlab-ce-9a0c1f64f540c1f07b2742a4c1c3ef4bee749bfd.tar.gz
Merge branch 'sidekiq-interrupt-running-jobs' into 'master'
Allow to interrupt running sidekiq jobs See merge request gitlab-org/gitlab-ce!31818
-rw-r--r--config/initializers/sidekiq.rb4
-rw-r--r--doc/administration/troubleshooting/sidekiq.md118
-rw-r--r--lib/gitlab/daemon.rb5
-rw-r--r--lib/gitlab/sidekiq_middleware/monitor.rb16
-rw-r--r--lib/gitlab/sidekiq_monitor.rb182
-rw-r--r--spec/lib/gitlab/daemon_spec.rb30
-rw-r--r--spec/lib/gitlab/sidekiq_middleware/monitor_spec.rb41
-rw-r--r--spec/lib/gitlab/sidekiq_monitor_spec.rb261
8 files changed, 650 insertions, 7 deletions
diff --git a/config/initializers/sidekiq.rb b/config/initializers/sidekiq.rb
index 7217f098fd9..9f3e104bc2b 100644
--- a/config/initializers/sidekiq.rb
+++ b/config/initializers/sidekiq.rb
@@ -28,11 +28,13 @@ if Rails.env.development?
end
enable_json_logs = Gitlab.config.sidekiq.log_format == 'json'
+enable_sidekiq_monitor = ENV.fetch("SIDEKIQ_MONITOR_WORKER", 0).to_i.nonzero?
Sidekiq.configure_server do |config|
config.redis = queues_config_hash
config.server_middleware do |chain|
+ chain.add Gitlab::SidekiqMiddleware::Monitor if enable_sidekiq_monitor
chain.add Gitlab::SidekiqMiddleware::Metrics if Settings.monitoring.sidekiq_exporter
chain.add Gitlab::SidekiqMiddleware::ArgumentsLogger if ENV['SIDEKIQ_LOG_ARGUMENTS'] && !enable_json_logs
chain.add Gitlab::SidekiqMiddleware::MemoryKiller if ENV['SIDEKIQ_MEMORY_KILLER_MAX_RSS']
@@ -57,6 +59,8 @@ Sidekiq.configure_server do |config|
# Clear any connections that might have been obtained before starting
# Sidekiq (e.g. in an initializer).
ActiveRecord::Base.clear_all_connections!
+
+ Gitlab::SidekiqMonitor.instance.start if enable_sidekiq_monitor
end
if enable_reliable_fetch?
diff --git a/doc/administration/troubleshooting/sidekiq.md b/doc/administration/troubleshooting/sidekiq.md
index 7067958ecb4..9b016c64e29 100644
--- a/doc/administration/troubleshooting/sidekiq.md
+++ b/doc/administration/troubleshooting/sidekiq.md
@@ -169,3 +169,121 @@ The PostgreSQL wiki has details on the query you can run to see blocking
queries. The query is different based on PostgreSQL version. See
[Lock Monitoring](https://wiki.postgresql.org/wiki/Lock_Monitoring) for
the query details.
+
+## Managing Sidekiq queues
+
+It is possible to use [Sidekiq API](https://github.com/mperham/sidekiq/wiki/API)
+to perform a number of troubleshoting on Sidekiq.
+
+These are the administrative commands and it should only be used if currently
+admin interface is not suitable due to scale of installation.
+
+All this commands should be run using `gitlab-rails console`.
+
+### View the queue size
+
+```ruby
+Sidekiq::Queue.new("pipeline_processing:build_queue").size
+```
+
+### Enumerate all enqueued jobs
+
+```ruby
+queue = Sidekiq::Queue.new("chaos:chaos_sleep")
+queue.each do |job|
+ # job.klass # => 'MyWorker'
+ # job.args # => [1, 2, 3]
+ # job.jid # => jid
+ # job.queue # => chaos:chaos_sleep
+ # job["retry"] # => 3
+ # job.item # => {
+ # "class"=>"Chaos::SleepWorker",
+ # "args"=>[1000],
+ # "retry"=>3,
+ # "queue"=>"chaos:chaos_sleep",
+ # "backtrace"=>true,
+ # "queue_namespace"=>"chaos",
+ # "jid"=>"39bc482b823cceaf07213523",
+ # "created_at"=>1566317076.266069,
+ # "correlation_id"=>"c323b832-a857-4858-b695-672de6f0e1af",
+ # "enqueued_at"=>1566317076.26761},
+ # }
+
+ # job.delete if job.jid == 'abcdef1234567890'
+end
+```
+
+### Enumerate currently running jobs
+
+```ruby
+workers = Sidekiq::Workers.new
+workers.each do |process_id, thread_id, work|
+ # process_id is a unique identifier per Sidekiq process
+ # thread_id is a unique identifier per thread
+ # work is a Hash which looks like:
+ # {"queue"=>"chaos:chaos_sleep",
+ # "payload"=>
+ # { "class"=>"Chaos::SleepWorker",
+ # "args"=>[1000],
+ # "retry"=>3,
+ # "queue"=>"chaos:chaos_sleep",
+ # "backtrace"=>true,
+ # "queue_namespace"=>"chaos",
+ # "jid"=>"b2a31e3eac7b1a99ff235869",
+ # "created_at"=>1566316974.9215662,
+ # "correlation_id"=>"e484fb26-7576-45f9-bf21-b99389e1c53c",
+ # "enqueued_at"=>1566316974.9229589},
+ # "run_at"=>1566316974}],
+end
+```
+
+### Remove sidekiq jobs for given parameters (destructive)
+
+```ruby
+# for jobs like this:
+# RepositoryImportWorker.new.perform_async(100)
+id_list = [100]
+
+queue = Sidekiq::Queue.new('repository_import')
+queue.each do |job|
+ job.delete if id_list.include?(job.args[0])
+end
+```
+
+### Remove specific job ID (destructive)
+
+```ruby
+queue = Sidekiq::Queue.new('repository_import')
+queue.each do |job|
+ job.delete if job.jid == 'my-job-id'
+end
+```
+
+## Canceling running jobs (destructive)
+
+> Introduced in GitLab 12.3.
+
+This is highly risky operation and use it as last resort.
+Doing that might result in data corruption, as the job
+is interrupted mid-execution and it is not guaranteed
+that proper rollback of transactions is implemented.
+
+```ruby
+Gitlab::SidekiqMonitor.cancel_job('job-id')
+```
+
+> This requires the Sidekiq to be run with `SIDEKIQ_MONITOR_WORKER=1`
+> environment variable.
+
+To perform of the interrupt we use `Thread.raise` which
+has number of drawbacks, as mentioned in [Why Ruby’s Timeout is dangerous (and Thread.raise is terrifying)](https://jvns.ca/blog/2015/11/27/why-rubys-timeout-is-dangerous-and-thread-dot-raise-is-terrifying/):
+
+> This is where the implications get interesting, and terrifying. This means that an exception can get raised:
+>
+> * during a network request (ok, as long as the surrounding code is prepared to catch Timeout::Error)
+> * during the cleanup for the network request
+> * during a rescue block
+> * while creating an object to save to the database afterwards
+> * in any of your code, regardless of whether it could have possibly raised an exception before
+>
+> Nobody writes code to defend against an exception being raised on literally any line. That’s not even possible. So Thread.raise is basically like a sneak attack on your code that could result in almost anything. It would probably be okay if it were pure-functional code that did not modify any state. But this is Ruby, so that’s unlikely :)
diff --git a/lib/gitlab/daemon.rb b/lib/gitlab/daemon.rb
index 6d5fc4219fb..2f4ae010e74 100644
--- a/lib/gitlab/daemon.rb
+++ b/lib/gitlab/daemon.rb
@@ -46,7 +46,10 @@ module Gitlab
if thread
thread.wakeup if thread.alive?
- thread.join unless Thread.current == thread
+ begin
+ thread.join unless Thread.current == thread
+ rescue Exception # rubocop:disable Lint/RescueException
+ end
@thread = nil
end
end
diff --git a/lib/gitlab/sidekiq_middleware/monitor.rb b/lib/gitlab/sidekiq_middleware/monitor.rb
new file mode 100644
index 00000000000..0d88fe760d3
--- /dev/null
+++ b/lib/gitlab/sidekiq_middleware/monitor.rb
@@ -0,0 +1,16 @@
+# frozen_string_literal: true
+
+module Gitlab
+ module SidekiqMiddleware
+ class Monitor
+ def call(worker, job, queue)
+ Gitlab::SidekiqMonitor.instance.within_job(job['jid'], queue) do
+ yield
+ end
+ rescue Gitlab::SidekiqMonitor::CancelledError
+ # ignore retries
+ raise Sidekiq::JobRetry::Skip
+ end
+ end
+ end
+end
diff --git a/lib/gitlab/sidekiq_monitor.rb b/lib/gitlab/sidekiq_monitor.rb
new file mode 100644
index 00000000000..9842f1f53f7
--- /dev/null
+++ b/lib/gitlab/sidekiq_monitor.rb
@@ -0,0 +1,182 @@
+# frozen_string_literal: true
+
+module Gitlab
+ class SidekiqMonitor < Daemon
+ include ::Gitlab::Utils::StrongMemoize
+
+ NOTIFICATION_CHANNEL = 'sidekiq:cancel:notifications'.freeze
+ CANCEL_DEADLINE = 24.hours.seconds
+ RECONNECT_TIME = 3.seconds
+
+ # We use exception derived from `Exception`
+ # to consider this as an very low-level exception
+ # that should not be caught by application
+ CancelledError = Class.new(Exception) # rubocop:disable Lint/InheritException
+
+ attr_reader :jobs_thread
+ attr_reader :jobs_mutex
+
+ def initialize
+ super
+
+ @jobs_thread = {}
+ @jobs_mutex = Mutex.new
+ end
+
+ def within_job(jid, queue)
+ jobs_mutex.synchronize do
+ jobs_thread[jid] = Thread.current
+ end
+
+ if cancelled?(jid)
+ Sidekiq.logger.warn(
+ class: self.class.to_s,
+ action: 'run',
+ queue: queue,
+ jid: jid,
+ canceled: true
+ )
+ raise CancelledError
+ end
+
+ yield
+ ensure
+ jobs_mutex.synchronize do
+ jobs_thread.delete(jid)
+ end
+ end
+
+ def self.cancel_job(jid)
+ payload = {
+ action: 'cancel',
+ jid: jid
+ }.to_json
+
+ ::Gitlab::Redis::SharedState.with do |redis|
+ redis.setex(cancel_job_key(jid), CANCEL_DEADLINE, 1)
+ redis.publish(NOTIFICATION_CHANNEL, payload)
+ end
+ end
+
+ private
+
+ def start_working
+ Sidekiq.logger.info(
+ class: self.class.to_s,
+ action: 'start',
+ message: 'Starting Monitor Daemon'
+ )
+
+ while enabled?
+ process_messages
+ sleep(RECONNECT_TIME)
+ end
+
+ ensure
+ Sidekiq.logger.warn(
+ class: self.class.to_s,
+ action: 'stop',
+ message: 'Stopping Monitor Daemon'
+ )
+ end
+
+ def stop_working
+ thread.raise(Interrupt) if thread.alive?
+ end
+
+ def process_messages
+ ::Gitlab::Redis::SharedState.with do |redis|
+ redis.subscribe(NOTIFICATION_CHANNEL) do |on|
+ on.message do |channel, message|
+ process_message(message)
+ end
+ end
+ end
+ rescue Exception => e # rubocop:disable Lint/RescueException
+ Sidekiq.logger.warn(
+ class: self.class.to_s,
+ action: 'exception',
+ message: e.message
+ )
+
+ # we re-raise system exceptions
+ raise e unless e.is_a?(StandardError)
+ end
+
+ def process_message(message)
+ Sidekiq.logger.info(
+ class: self.class.to_s,
+ channel: NOTIFICATION_CHANNEL,
+ message: 'Received payload on channel',
+ payload: message
+ )
+
+ message = safe_parse(message)
+ return unless message
+
+ case message['action']
+ when 'cancel'
+ process_job_cancel(message['jid'])
+ else
+ # unknown message
+ end
+ end
+
+ def safe_parse(message)
+ JSON.parse(message)
+ rescue JSON::ParserError
+ end
+
+ def process_job_cancel(jid)
+ return unless jid
+
+ # try to find thread without lock
+ return unless find_thread_unsafe(jid)
+
+ Thread.new do
+ # try to find a thread, but with guaranteed
+ # that handle for thread corresponds to actually
+ # running job
+ find_thread_with_lock(jid) do |thread|
+ Sidekiq.logger.warn(
+ class: self.class.to_s,
+ action: 'cancel',
+ message: 'Canceling thread with CancelledError',
+ jid: jid,
+ thread_id: thread.object_id
+ )
+
+ thread&.raise(CancelledError)
+ end
+ end
+ end
+
+ # This method needs to be thread-safe
+ # This is why it passes thread in block,
+ # to ensure that we do process this thread
+ def find_thread_unsafe(jid)
+ jobs_thread[jid]
+ end
+
+ def find_thread_with_lock(jid)
+ # don't try to lock if we cannot find the thread
+ return unless find_thread_unsafe(jid)
+
+ jobs_mutex.synchronize do
+ find_thread_unsafe(jid).tap do |thread|
+ yield(thread) if thread
+ end
+ end
+ end
+
+ def cancelled?(jid)
+ ::Gitlab::Redis::SharedState.with do |redis|
+ redis.exists(self.class.cancel_job_key(jid))
+ end
+ end
+
+ def self.cancel_job_key(jid)
+ "sidekiq:cancel:#{jid}"
+ end
+ end
+end
diff --git a/spec/lib/gitlab/daemon_spec.rb b/spec/lib/gitlab/daemon_spec.rb
index d3e73314b87..0372b770844 100644
--- a/spec/lib/gitlab/daemon_spec.rb
+++ b/spec/lib/gitlab/daemon_spec.rb
@@ -34,12 +34,12 @@ describe Gitlab::Daemon do
end
end
- describe 'when Daemon is enabled' do
+ context 'when Daemon is enabled' do
before do
allow(subject).to receive(:enabled?).and_return(true)
end
- describe 'when Daemon is stopped' do
+ context 'when Daemon is stopped' do
describe '#start' do
it 'starts the Daemon' do
expect { subject.start.join }.to change { subject.thread? }.from(false).to(true)
@@ -57,14 +57,14 @@ describe Gitlab::Daemon do
end
end
- describe 'when Daemon is running' do
+ context 'when Daemon is running' do
before do
- subject.start.join
+ subject.start
end
describe '#start' do
it "doesn't start running Daemon" do
- expect { subject.start.join }.not_to change { subject.thread? }
+ expect { subject.start.join }.not_to change { subject.thread }
expect(subject).to have_received(:start_working).once
end
@@ -76,11 +76,29 @@ describe Gitlab::Daemon do
expect(subject).to have_received(:stop_working)
end
+
+ context 'when stop_working raises exception' do
+ before do
+ allow(subject).to receive(:start_working) do
+ sleep(1000)
+ end
+ end
+
+ it 'shutdowns Daemon' do
+ expect(subject).to receive(:stop_working) do
+ subject.thread.raise(Interrupt)
+ end
+
+ expect(subject.thread).to be_alive
+ expect { subject.stop }.not_to raise_error
+ expect(subject.thread).to be_nil
+ end
+ end
end
end
end
- describe 'when Daemon is disabled' do
+ context 'when Daemon is disabled' do
before do
allow(subject).to receive(:enabled?).and_return(false)
end
diff --git a/spec/lib/gitlab/sidekiq_middleware/monitor_spec.rb b/spec/lib/gitlab/sidekiq_middleware/monitor_spec.rb
new file mode 100644
index 00000000000..2933d26a387
--- /dev/null
+++ b/spec/lib/gitlab/sidekiq_middleware/monitor_spec.rb
@@ -0,0 +1,41 @@
+# frozen_string_literal: true
+
+require 'spec_helper'
+
+describe Gitlab::SidekiqMiddleware::Monitor do
+ let(:monitor) { described_class.new }
+
+ describe '#call' do
+ let(:worker) { double }
+ let(:job) { { 'jid' => 'job-id' } }
+ let(:queue) { 'my-queue' }
+
+ it 'calls SidekiqMonitor' do
+ expect(Gitlab::SidekiqMonitor.instance).to receive(:within_job)
+ .with('job-id', 'my-queue')
+ .and_call_original
+
+ expect { |blk| monitor.call(worker, job, queue, &blk) }.to yield_control
+ end
+
+ it 'passthroughs the return value' do
+ result = monitor.call(worker, job, queue) do
+ 'value'
+ end
+
+ expect(result).to eq('value')
+ end
+
+ context 'when cancel happens' do
+ subject do
+ monitor.call(worker, job, queue) do
+ raise Gitlab::SidekiqMonitor::CancelledError
+ end
+ end
+
+ it 'does skip this job' do
+ expect { subject }.to raise_error(Sidekiq::JobRetry::Skip)
+ end
+ end
+ end
+end
diff --git a/spec/lib/gitlab/sidekiq_monitor_spec.rb b/spec/lib/gitlab/sidekiq_monitor_spec.rb
new file mode 100644
index 00000000000..bbd7bf90217
--- /dev/null
+++ b/spec/lib/gitlab/sidekiq_monitor_spec.rb
@@ -0,0 +1,261 @@
+# frozen_string_literal: true
+
+require 'spec_helper'
+
+describe Gitlab::SidekiqMonitor do
+ let(:monitor) { described_class.new }
+
+ describe '#within_job' do
+ it 'tracks thread' do
+ blk = proc do
+ expect(monitor.jobs_thread['jid']).not_to be_nil
+
+ "OK"
+ end
+
+ expect(monitor.within_job('jid', 'queue', &blk)).to eq("OK")
+ end
+
+ context 'when job is canceled' do
+ let(:jid) { SecureRandom.hex }
+
+ before do
+ described_class.cancel_job(jid)
+ end
+
+ it 'does not execute a block' do
+ expect do |blk|
+ monitor.within_job(jid, 'queue', &blk)
+ rescue described_class::CancelledError
+ end.not_to yield_control
+ end
+
+ it 'raises exception' do
+ expect { monitor.within_job(jid, 'queue') }.to raise_error(
+ described_class::CancelledError)
+ end
+ end
+ end
+
+ describe '#start_working' do
+ subject { monitor.send(:start_working) }
+
+ before do
+ # we want to run at most once cycle
+ # we toggle `enabled?` flag after the first call
+ stub_const('Gitlab::SidekiqMonitor::RECONNECT_TIME', 0)
+ allow(monitor).to receive(:enabled?).and_return(true, false)
+
+ allow(Sidekiq.logger).to receive(:info)
+ allow(Sidekiq.logger).to receive(:warn)
+ end
+
+ context 'when structured logging is used' do
+ it 'logs start message' do
+ expect(Sidekiq.logger).to receive(:info)
+ .with(
+ class: described_class.to_s,
+ action: 'start',
+ message: 'Starting Monitor Daemon')
+
+ expect(::Gitlab::Redis::SharedState).to receive(:with)
+
+ subject
+ end
+
+ it 'logs stop message' do
+ expect(Sidekiq.logger).to receive(:warn)
+ .with(
+ class: described_class.to_s,
+ action: 'stop',
+ message: 'Stopping Monitor Daemon')
+
+ expect(::Gitlab::Redis::SharedState).to receive(:with)
+
+ subject
+ end
+
+ it 'logs StandardError message' do
+ expect(Sidekiq.logger).to receive(:warn)
+ .with(
+ class: described_class.to_s,
+ action: 'exception',
+ message: 'My Exception')
+
+ expect(::Gitlab::Redis::SharedState).to receive(:with)
+ .and_raise(StandardError, 'My Exception')
+
+ expect { subject }.not_to raise_error
+ end
+
+ it 'logs and raises Exception message' do
+ expect(Sidekiq.logger).to receive(:warn)
+ .with(
+ class: described_class.to_s,
+ action: 'exception',
+ message: 'My Exception')
+
+ expect(::Gitlab::Redis::SharedState).to receive(:with)
+ .and_raise(Exception, 'My Exception')
+
+ expect { subject }.to raise_error(Exception, 'My Exception')
+ end
+ end
+
+ context 'when StandardError is raised' do
+ it 'does retry connection' do
+ expect(::Gitlab::Redis::SharedState).to receive(:with)
+ .and_raise(StandardError, 'My Exception')
+
+ expect(::Gitlab::Redis::SharedState).to receive(:with)
+
+ # we expect to run `process_messages` twice
+ expect(monitor).to receive(:enabled?).and_return(true, true, false)
+
+ subject
+ end
+ end
+
+ context 'when message is published' do
+ let(:subscribed) { double }
+
+ before do
+ expect_any_instance_of(::Redis).to receive(:subscribe)
+ .and_yield(subscribed)
+
+ expect(subscribed).to receive(:message)
+ .and_yield(
+ described_class::NOTIFICATION_CHANNEL,
+ payload
+ )
+
+ expect(Sidekiq.logger).to receive(:info)
+ .with(
+ class: described_class.to_s,
+ action: 'start',
+ message: 'Starting Monitor Daemon')
+
+ expect(Sidekiq.logger).to receive(:info)
+ .with(
+ class: described_class.to_s,
+ channel: described_class::NOTIFICATION_CHANNEL,
+ message: 'Received payload on channel',
+ payload: payload
+ )
+ end
+
+ context 'and message is valid' do
+ let(:payload) { '{"action":"cancel","jid":"my-jid"}' }
+
+ it 'processes cancel' do
+ expect(monitor).to receive(:process_job_cancel).with('my-jid')
+
+ subject
+ end
+ end
+
+ context 'and message is not valid json' do
+ let(:payload) { '{"action"}' }
+
+ it 'skips processing' do
+ expect(monitor).not_to receive(:process_job_cancel)
+
+ subject
+ end
+ end
+ end
+ end
+
+ describe '#stop' do
+ let!(:monitor_thread) { monitor.start }
+
+ it 'does stop the thread' do
+ expect(monitor_thread).to be_alive
+
+ expect { monitor.stop }.not_to raise_error
+
+ expect(monitor_thread).not_to be_alive
+ expect { monitor_thread.value }.to raise_error(Interrupt)
+ end
+ end
+
+ describe '#process_job_cancel' do
+ subject { monitor.send(:process_job_cancel, jid) }
+
+ context 'when jid is missing' do
+ let(:jid) { nil }
+
+ it 'does not run thread' do
+ expect(subject).to be_nil
+ end
+ end
+
+ context 'when jid is provided' do
+ let(:jid) { 'my-jid' }
+
+ context 'when jid is not found' do
+ it 'does not log cancellation message' do
+ expect(Sidekiq.logger).not_to receive(:warn)
+ expect(subject).to be_nil
+ end
+ end
+
+ context 'when jid is found' do
+ let(:thread) { Thread.new { sleep 1000 } }
+
+ before do
+ monitor.jobs_thread[jid] = thread
+ end
+
+ after do
+ thread.kill
+ rescue
+ end
+
+ it 'does log cancellation message' do
+ expect(Sidekiq.logger).to receive(:warn)
+ .with(
+ class: described_class.to_s,
+ action: 'cancel',
+ message: 'Canceling thread with CancelledError',
+ jid: 'my-jid',
+ thread_id: thread.object_id)
+
+ expect(subject).to be_a(Thread)
+
+ subject.join
+ end
+
+ it 'does cancel the thread' do
+ expect(subject).to be_a(Thread)
+
+ subject.join
+
+ # we wait for the thread to be cancelled
+ # by `process_job_cancel`
+ expect { thread.join(5) }.to raise_error(described_class::CancelledError)
+ end
+ end
+ end
+ end
+
+ describe '.cancel_job' do
+ subject { described_class.cancel_job('my-jid') }
+
+ it 'sets a redis key' do
+ expect_any_instance_of(::Redis).to receive(:setex)
+ .with('sidekiq:cancel:my-jid', anything, 1)
+
+ subject
+ end
+
+ it 'notifies all workers' do
+ payload = '{"action":"cancel","jid":"my-jid"}'
+
+ expect_any_instance_of(::Redis).to receive(:publish)
+ .with('sidekiq:cancel:notifications', payload)
+
+ subject
+ end
+ end
+end