diff options
-rw-r--r-- | config/initializers/sidekiq.rb | 4 | ||||
-rw-r--r-- | doc/administration/troubleshooting/sidekiq.md | 118 | ||||
-rw-r--r-- | lib/gitlab/daemon.rb | 5 | ||||
-rw-r--r-- | lib/gitlab/sidekiq_middleware/monitor.rb | 16 | ||||
-rw-r--r-- | lib/gitlab/sidekiq_monitor.rb | 182 | ||||
-rw-r--r-- | spec/lib/gitlab/daemon_spec.rb | 30 | ||||
-rw-r--r-- | spec/lib/gitlab/sidekiq_middleware/monitor_spec.rb | 41 | ||||
-rw-r--r-- | spec/lib/gitlab/sidekiq_monitor_spec.rb | 261 |
8 files changed, 650 insertions, 7 deletions
diff --git a/config/initializers/sidekiq.rb b/config/initializers/sidekiq.rb index 7217f098fd9..9f3e104bc2b 100644 --- a/config/initializers/sidekiq.rb +++ b/config/initializers/sidekiq.rb @@ -28,11 +28,13 @@ if Rails.env.development? end enable_json_logs = Gitlab.config.sidekiq.log_format == 'json' +enable_sidekiq_monitor = ENV.fetch("SIDEKIQ_MONITOR_WORKER", 0).to_i.nonzero? Sidekiq.configure_server do |config| config.redis = queues_config_hash config.server_middleware do |chain| + chain.add Gitlab::SidekiqMiddleware::Monitor if enable_sidekiq_monitor chain.add Gitlab::SidekiqMiddleware::Metrics if Settings.monitoring.sidekiq_exporter chain.add Gitlab::SidekiqMiddleware::ArgumentsLogger if ENV['SIDEKIQ_LOG_ARGUMENTS'] && !enable_json_logs chain.add Gitlab::SidekiqMiddleware::MemoryKiller if ENV['SIDEKIQ_MEMORY_KILLER_MAX_RSS'] @@ -57,6 +59,8 @@ Sidekiq.configure_server do |config| # Clear any connections that might have been obtained before starting # Sidekiq (e.g. in an initializer). ActiveRecord::Base.clear_all_connections! + + Gitlab::SidekiqMonitor.instance.start if enable_sidekiq_monitor end if enable_reliable_fetch? diff --git a/doc/administration/troubleshooting/sidekiq.md b/doc/administration/troubleshooting/sidekiq.md index 7067958ecb4..9b016c64e29 100644 --- a/doc/administration/troubleshooting/sidekiq.md +++ b/doc/administration/troubleshooting/sidekiq.md @@ -169,3 +169,121 @@ The PostgreSQL wiki has details on the query you can run to see blocking queries. The query is different based on PostgreSQL version. See [Lock Monitoring](https://wiki.postgresql.org/wiki/Lock_Monitoring) for the query details. + +## Managing Sidekiq queues + +It is possible to use [Sidekiq API](https://github.com/mperham/sidekiq/wiki/API) +to perform a number of troubleshoting on Sidekiq. + +These are the administrative commands and it should only be used if currently +admin interface is not suitable due to scale of installation. + +All this commands should be run using `gitlab-rails console`. + +### View the queue size + +```ruby +Sidekiq::Queue.new("pipeline_processing:build_queue").size +``` + +### Enumerate all enqueued jobs + +```ruby +queue = Sidekiq::Queue.new("chaos:chaos_sleep") +queue.each do |job| + # job.klass # => 'MyWorker' + # job.args # => [1, 2, 3] + # job.jid # => jid + # job.queue # => chaos:chaos_sleep + # job["retry"] # => 3 + # job.item # => { + # "class"=>"Chaos::SleepWorker", + # "args"=>[1000], + # "retry"=>3, + # "queue"=>"chaos:chaos_sleep", + # "backtrace"=>true, + # "queue_namespace"=>"chaos", + # "jid"=>"39bc482b823cceaf07213523", + # "created_at"=>1566317076.266069, + # "correlation_id"=>"c323b832-a857-4858-b695-672de6f0e1af", + # "enqueued_at"=>1566317076.26761}, + # } + + # job.delete if job.jid == 'abcdef1234567890' +end +``` + +### Enumerate currently running jobs + +```ruby +workers = Sidekiq::Workers.new +workers.each do |process_id, thread_id, work| + # process_id is a unique identifier per Sidekiq process + # thread_id is a unique identifier per thread + # work is a Hash which looks like: + # {"queue"=>"chaos:chaos_sleep", + # "payload"=> + # { "class"=>"Chaos::SleepWorker", + # "args"=>[1000], + # "retry"=>3, + # "queue"=>"chaos:chaos_sleep", + # "backtrace"=>true, + # "queue_namespace"=>"chaos", + # "jid"=>"b2a31e3eac7b1a99ff235869", + # "created_at"=>1566316974.9215662, + # "correlation_id"=>"e484fb26-7576-45f9-bf21-b99389e1c53c", + # "enqueued_at"=>1566316974.9229589}, + # "run_at"=>1566316974}], +end +``` + +### Remove sidekiq jobs for given parameters (destructive) + +```ruby +# for jobs like this: +# RepositoryImportWorker.new.perform_async(100) +id_list = [100] + +queue = Sidekiq::Queue.new('repository_import') +queue.each do |job| + job.delete if id_list.include?(job.args[0]) +end +``` + +### Remove specific job ID (destructive) + +```ruby +queue = Sidekiq::Queue.new('repository_import') +queue.each do |job| + job.delete if job.jid == 'my-job-id' +end +``` + +## Canceling running jobs (destructive) + +> Introduced in GitLab 12.3. + +This is highly risky operation and use it as last resort. +Doing that might result in data corruption, as the job +is interrupted mid-execution and it is not guaranteed +that proper rollback of transactions is implemented. + +```ruby +Gitlab::SidekiqMonitor.cancel_job('job-id') +``` + +> This requires the Sidekiq to be run with `SIDEKIQ_MONITOR_WORKER=1` +> environment variable. + +To perform of the interrupt we use `Thread.raise` which +has number of drawbacks, as mentioned in [Why Ruby’s Timeout is dangerous (and Thread.raise is terrifying)](https://jvns.ca/blog/2015/11/27/why-rubys-timeout-is-dangerous-and-thread-dot-raise-is-terrifying/): + +> This is where the implications get interesting, and terrifying. This means that an exception can get raised: +> +> * during a network request (ok, as long as the surrounding code is prepared to catch Timeout::Error) +> * during the cleanup for the network request +> * during a rescue block +> * while creating an object to save to the database afterwards +> * in any of your code, regardless of whether it could have possibly raised an exception before +> +> Nobody writes code to defend against an exception being raised on literally any line. That’s not even possible. So Thread.raise is basically like a sneak attack on your code that could result in almost anything. It would probably be okay if it were pure-functional code that did not modify any state. But this is Ruby, so that’s unlikely :) diff --git a/lib/gitlab/daemon.rb b/lib/gitlab/daemon.rb index 6d5fc4219fb..2f4ae010e74 100644 --- a/lib/gitlab/daemon.rb +++ b/lib/gitlab/daemon.rb @@ -46,7 +46,10 @@ module Gitlab if thread thread.wakeup if thread.alive? - thread.join unless Thread.current == thread + begin + thread.join unless Thread.current == thread + rescue Exception # rubocop:disable Lint/RescueException + end @thread = nil end end diff --git a/lib/gitlab/sidekiq_middleware/monitor.rb b/lib/gitlab/sidekiq_middleware/monitor.rb new file mode 100644 index 00000000000..0d88fe760d3 --- /dev/null +++ b/lib/gitlab/sidekiq_middleware/monitor.rb @@ -0,0 +1,16 @@ +# frozen_string_literal: true + +module Gitlab + module SidekiqMiddleware + class Monitor + def call(worker, job, queue) + Gitlab::SidekiqMonitor.instance.within_job(job['jid'], queue) do + yield + end + rescue Gitlab::SidekiqMonitor::CancelledError + # ignore retries + raise Sidekiq::JobRetry::Skip + end + end + end +end diff --git a/lib/gitlab/sidekiq_monitor.rb b/lib/gitlab/sidekiq_monitor.rb new file mode 100644 index 00000000000..9842f1f53f7 --- /dev/null +++ b/lib/gitlab/sidekiq_monitor.rb @@ -0,0 +1,182 @@ +# frozen_string_literal: true + +module Gitlab + class SidekiqMonitor < Daemon + include ::Gitlab::Utils::StrongMemoize + + NOTIFICATION_CHANNEL = 'sidekiq:cancel:notifications'.freeze + CANCEL_DEADLINE = 24.hours.seconds + RECONNECT_TIME = 3.seconds + + # We use exception derived from `Exception` + # to consider this as an very low-level exception + # that should not be caught by application + CancelledError = Class.new(Exception) # rubocop:disable Lint/InheritException + + attr_reader :jobs_thread + attr_reader :jobs_mutex + + def initialize + super + + @jobs_thread = {} + @jobs_mutex = Mutex.new + end + + def within_job(jid, queue) + jobs_mutex.synchronize do + jobs_thread[jid] = Thread.current + end + + if cancelled?(jid) + Sidekiq.logger.warn( + class: self.class.to_s, + action: 'run', + queue: queue, + jid: jid, + canceled: true + ) + raise CancelledError + end + + yield + ensure + jobs_mutex.synchronize do + jobs_thread.delete(jid) + end + end + + def self.cancel_job(jid) + payload = { + action: 'cancel', + jid: jid + }.to_json + + ::Gitlab::Redis::SharedState.with do |redis| + redis.setex(cancel_job_key(jid), CANCEL_DEADLINE, 1) + redis.publish(NOTIFICATION_CHANNEL, payload) + end + end + + private + + def start_working + Sidekiq.logger.info( + class: self.class.to_s, + action: 'start', + message: 'Starting Monitor Daemon' + ) + + while enabled? + process_messages + sleep(RECONNECT_TIME) + end + + ensure + Sidekiq.logger.warn( + class: self.class.to_s, + action: 'stop', + message: 'Stopping Monitor Daemon' + ) + end + + def stop_working + thread.raise(Interrupt) if thread.alive? + end + + def process_messages + ::Gitlab::Redis::SharedState.with do |redis| + redis.subscribe(NOTIFICATION_CHANNEL) do |on| + on.message do |channel, message| + process_message(message) + end + end + end + rescue Exception => e # rubocop:disable Lint/RescueException + Sidekiq.logger.warn( + class: self.class.to_s, + action: 'exception', + message: e.message + ) + + # we re-raise system exceptions + raise e unless e.is_a?(StandardError) + end + + def process_message(message) + Sidekiq.logger.info( + class: self.class.to_s, + channel: NOTIFICATION_CHANNEL, + message: 'Received payload on channel', + payload: message + ) + + message = safe_parse(message) + return unless message + + case message['action'] + when 'cancel' + process_job_cancel(message['jid']) + else + # unknown message + end + end + + def safe_parse(message) + JSON.parse(message) + rescue JSON::ParserError + end + + def process_job_cancel(jid) + return unless jid + + # try to find thread without lock + return unless find_thread_unsafe(jid) + + Thread.new do + # try to find a thread, but with guaranteed + # that handle for thread corresponds to actually + # running job + find_thread_with_lock(jid) do |thread| + Sidekiq.logger.warn( + class: self.class.to_s, + action: 'cancel', + message: 'Canceling thread with CancelledError', + jid: jid, + thread_id: thread.object_id + ) + + thread&.raise(CancelledError) + end + end + end + + # This method needs to be thread-safe + # This is why it passes thread in block, + # to ensure that we do process this thread + def find_thread_unsafe(jid) + jobs_thread[jid] + end + + def find_thread_with_lock(jid) + # don't try to lock if we cannot find the thread + return unless find_thread_unsafe(jid) + + jobs_mutex.synchronize do + find_thread_unsafe(jid).tap do |thread| + yield(thread) if thread + end + end + end + + def cancelled?(jid) + ::Gitlab::Redis::SharedState.with do |redis| + redis.exists(self.class.cancel_job_key(jid)) + end + end + + def self.cancel_job_key(jid) + "sidekiq:cancel:#{jid}" + end + end +end diff --git a/spec/lib/gitlab/daemon_spec.rb b/spec/lib/gitlab/daemon_spec.rb index d3e73314b87..0372b770844 100644 --- a/spec/lib/gitlab/daemon_spec.rb +++ b/spec/lib/gitlab/daemon_spec.rb @@ -34,12 +34,12 @@ describe Gitlab::Daemon do end end - describe 'when Daemon is enabled' do + context 'when Daemon is enabled' do before do allow(subject).to receive(:enabled?).and_return(true) end - describe 'when Daemon is stopped' do + context 'when Daemon is stopped' do describe '#start' do it 'starts the Daemon' do expect { subject.start.join }.to change { subject.thread? }.from(false).to(true) @@ -57,14 +57,14 @@ describe Gitlab::Daemon do end end - describe 'when Daemon is running' do + context 'when Daemon is running' do before do - subject.start.join + subject.start end describe '#start' do it "doesn't start running Daemon" do - expect { subject.start.join }.not_to change { subject.thread? } + expect { subject.start.join }.not_to change { subject.thread } expect(subject).to have_received(:start_working).once end @@ -76,11 +76,29 @@ describe Gitlab::Daemon do expect(subject).to have_received(:stop_working) end + + context 'when stop_working raises exception' do + before do + allow(subject).to receive(:start_working) do + sleep(1000) + end + end + + it 'shutdowns Daemon' do + expect(subject).to receive(:stop_working) do + subject.thread.raise(Interrupt) + end + + expect(subject.thread).to be_alive + expect { subject.stop }.not_to raise_error + expect(subject.thread).to be_nil + end + end end end end - describe 'when Daemon is disabled' do + context 'when Daemon is disabled' do before do allow(subject).to receive(:enabled?).and_return(false) end diff --git a/spec/lib/gitlab/sidekiq_middleware/monitor_spec.rb b/spec/lib/gitlab/sidekiq_middleware/monitor_spec.rb new file mode 100644 index 00000000000..2933d26a387 --- /dev/null +++ b/spec/lib/gitlab/sidekiq_middleware/monitor_spec.rb @@ -0,0 +1,41 @@ +# frozen_string_literal: true + +require 'spec_helper' + +describe Gitlab::SidekiqMiddleware::Monitor do + let(:monitor) { described_class.new } + + describe '#call' do + let(:worker) { double } + let(:job) { { 'jid' => 'job-id' } } + let(:queue) { 'my-queue' } + + it 'calls SidekiqMonitor' do + expect(Gitlab::SidekiqMonitor.instance).to receive(:within_job) + .with('job-id', 'my-queue') + .and_call_original + + expect { |blk| monitor.call(worker, job, queue, &blk) }.to yield_control + end + + it 'passthroughs the return value' do + result = monitor.call(worker, job, queue) do + 'value' + end + + expect(result).to eq('value') + end + + context 'when cancel happens' do + subject do + monitor.call(worker, job, queue) do + raise Gitlab::SidekiqMonitor::CancelledError + end + end + + it 'does skip this job' do + expect { subject }.to raise_error(Sidekiq::JobRetry::Skip) + end + end + end +end diff --git a/spec/lib/gitlab/sidekiq_monitor_spec.rb b/spec/lib/gitlab/sidekiq_monitor_spec.rb new file mode 100644 index 00000000000..bbd7bf90217 --- /dev/null +++ b/spec/lib/gitlab/sidekiq_monitor_spec.rb @@ -0,0 +1,261 @@ +# frozen_string_literal: true + +require 'spec_helper' + +describe Gitlab::SidekiqMonitor do + let(:monitor) { described_class.new } + + describe '#within_job' do + it 'tracks thread' do + blk = proc do + expect(monitor.jobs_thread['jid']).not_to be_nil + + "OK" + end + + expect(monitor.within_job('jid', 'queue', &blk)).to eq("OK") + end + + context 'when job is canceled' do + let(:jid) { SecureRandom.hex } + + before do + described_class.cancel_job(jid) + end + + it 'does not execute a block' do + expect do |blk| + monitor.within_job(jid, 'queue', &blk) + rescue described_class::CancelledError + end.not_to yield_control + end + + it 'raises exception' do + expect { monitor.within_job(jid, 'queue') }.to raise_error( + described_class::CancelledError) + end + end + end + + describe '#start_working' do + subject { monitor.send(:start_working) } + + before do + # we want to run at most once cycle + # we toggle `enabled?` flag after the first call + stub_const('Gitlab::SidekiqMonitor::RECONNECT_TIME', 0) + allow(monitor).to receive(:enabled?).and_return(true, false) + + allow(Sidekiq.logger).to receive(:info) + allow(Sidekiq.logger).to receive(:warn) + end + + context 'when structured logging is used' do + it 'logs start message' do + expect(Sidekiq.logger).to receive(:info) + .with( + class: described_class.to_s, + action: 'start', + message: 'Starting Monitor Daemon') + + expect(::Gitlab::Redis::SharedState).to receive(:with) + + subject + end + + it 'logs stop message' do + expect(Sidekiq.logger).to receive(:warn) + .with( + class: described_class.to_s, + action: 'stop', + message: 'Stopping Monitor Daemon') + + expect(::Gitlab::Redis::SharedState).to receive(:with) + + subject + end + + it 'logs StandardError message' do + expect(Sidekiq.logger).to receive(:warn) + .with( + class: described_class.to_s, + action: 'exception', + message: 'My Exception') + + expect(::Gitlab::Redis::SharedState).to receive(:with) + .and_raise(StandardError, 'My Exception') + + expect { subject }.not_to raise_error + end + + it 'logs and raises Exception message' do + expect(Sidekiq.logger).to receive(:warn) + .with( + class: described_class.to_s, + action: 'exception', + message: 'My Exception') + + expect(::Gitlab::Redis::SharedState).to receive(:with) + .and_raise(Exception, 'My Exception') + + expect { subject }.to raise_error(Exception, 'My Exception') + end + end + + context 'when StandardError is raised' do + it 'does retry connection' do + expect(::Gitlab::Redis::SharedState).to receive(:with) + .and_raise(StandardError, 'My Exception') + + expect(::Gitlab::Redis::SharedState).to receive(:with) + + # we expect to run `process_messages` twice + expect(monitor).to receive(:enabled?).and_return(true, true, false) + + subject + end + end + + context 'when message is published' do + let(:subscribed) { double } + + before do + expect_any_instance_of(::Redis).to receive(:subscribe) + .and_yield(subscribed) + + expect(subscribed).to receive(:message) + .and_yield( + described_class::NOTIFICATION_CHANNEL, + payload + ) + + expect(Sidekiq.logger).to receive(:info) + .with( + class: described_class.to_s, + action: 'start', + message: 'Starting Monitor Daemon') + + expect(Sidekiq.logger).to receive(:info) + .with( + class: described_class.to_s, + channel: described_class::NOTIFICATION_CHANNEL, + message: 'Received payload on channel', + payload: payload + ) + end + + context 'and message is valid' do + let(:payload) { '{"action":"cancel","jid":"my-jid"}' } + + it 'processes cancel' do + expect(monitor).to receive(:process_job_cancel).with('my-jid') + + subject + end + end + + context 'and message is not valid json' do + let(:payload) { '{"action"}' } + + it 'skips processing' do + expect(monitor).not_to receive(:process_job_cancel) + + subject + end + end + end + end + + describe '#stop' do + let!(:monitor_thread) { monitor.start } + + it 'does stop the thread' do + expect(monitor_thread).to be_alive + + expect { monitor.stop }.not_to raise_error + + expect(monitor_thread).not_to be_alive + expect { monitor_thread.value }.to raise_error(Interrupt) + end + end + + describe '#process_job_cancel' do + subject { monitor.send(:process_job_cancel, jid) } + + context 'when jid is missing' do + let(:jid) { nil } + + it 'does not run thread' do + expect(subject).to be_nil + end + end + + context 'when jid is provided' do + let(:jid) { 'my-jid' } + + context 'when jid is not found' do + it 'does not log cancellation message' do + expect(Sidekiq.logger).not_to receive(:warn) + expect(subject).to be_nil + end + end + + context 'when jid is found' do + let(:thread) { Thread.new { sleep 1000 } } + + before do + monitor.jobs_thread[jid] = thread + end + + after do + thread.kill + rescue + end + + it 'does log cancellation message' do + expect(Sidekiq.logger).to receive(:warn) + .with( + class: described_class.to_s, + action: 'cancel', + message: 'Canceling thread with CancelledError', + jid: 'my-jid', + thread_id: thread.object_id) + + expect(subject).to be_a(Thread) + + subject.join + end + + it 'does cancel the thread' do + expect(subject).to be_a(Thread) + + subject.join + + # we wait for the thread to be cancelled + # by `process_job_cancel` + expect { thread.join(5) }.to raise_error(described_class::CancelledError) + end + end + end + end + + describe '.cancel_job' do + subject { described_class.cancel_job('my-jid') } + + it 'sets a redis key' do + expect_any_instance_of(::Redis).to receive(:setex) + .with('sidekiq:cancel:my-jid', anything, 1) + + subject + end + + it 'notifies all workers' do + payload = '{"action":"cancel","jid":"my-jid"}' + + expect_any_instance_of(::Redis).to receive(:publish) + .with('sidekiq:cancel:notifications', payload) + + subject + end + end +end |