diff options
author | Kamil Trzciński <ayufan@ayufan.eu> | 2019-08-20 17:25:04 +0200 |
---|---|---|
committer | Kamil Trzciński <ayufan@ayufan.eu> | 2019-08-21 12:05:30 +0200 |
commit | c2cbfc5c4afbe8385659f97769db8450284639cf (patch) | |
tree | 430ac243924b4b3fb4e389a9c763ea6bd484c2f0 | |
parent | 75e2302d0126c4bc8ea215ffb4e72612d44e73bb (diff) | |
download | gitlab-ce-c2cbfc5c4afbe8385659f97769db8450284639cf.tar.gz |
Rework `Sidekiq::JobsThreads` into `Monitor`
This makes:
- very shallow `Middleware::Monitor` to only request tracking
of sidekiq jobs,
- `SidekiqStatus::Monitor` to be responsible to maintain persistent
connection to receive messages,
- `SidekiqStatus::Monitor` to always use structured logging
and instance variables
-rw-r--r-- | config/initializers/sidekiq.rb | 6 | ||||
-rw-r--r-- | doc/administration/troubleshooting/sidekiq.md | 118 | ||||
-rw-r--r-- | lib/gitlab/sidekiq_middleware/jobs_threads.rb | 49 | ||||
-rw-r--r-- | lib/gitlab/sidekiq_middleware/monitor.rb | 13 | ||||
-rw-r--r-- | lib/gitlab/sidekiq_monitor.rb | 153 | ||||
-rw-r--r-- | lib/gitlab/sidekiq_status/monitor.rb | 46 | ||||
-rw-r--r-- | spec/lib/gitlab/sidekiq_middleware/jobs_threads_spec.rb | 83 | ||||
-rw-r--r-- | spec/lib/gitlab/sidekiq_middleware/monitor_spec.rb | 29 | ||||
-rw-r--r-- | spec/lib/gitlab/sidekiq_monitor_spec.rb | 206 |
9 files changed, 523 insertions, 180 deletions
diff --git a/config/initializers/sidekiq.rb b/config/initializers/sidekiq.rb index b05d4342e23..e145af5e2d5 100644 --- a/config/initializers/sidekiq.rb +++ b/config/initializers/sidekiq.rb @@ -33,7 +33,7 @@ Sidekiq.configure_server do |config| config.redis = queues_config_hash config.server_middleware do |chain| - chain.add Gitlab::SidekiqMiddleware::JobsThreads unless ENV['DISABLE_SIDEKIQ_INTERRUPT_RUNNING_JOBS'] + chain.add Gitlab::SidekiqMiddleware::Monitor chain.add Gitlab::SidekiqMiddleware::Metrics if Settings.monitoring.sidekiq_exporter chain.add Gitlab::SidekiqMiddleware::ArgumentsLogger if ENV['SIDEKIQ_LOG_ARGUMENTS'] && !enable_json_logs chain.add Gitlab::SidekiqMiddleware::MemoryKiller if ENV['SIDEKIQ_MEMORY_KILLER_MAX_RSS'] @@ -59,7 +59,9 @@ Sidekiq.configure_server do |config| # Sidekiq (e.g. in an initializer). ActiveRecord::Base.clear_all_connections! - Gitlab::SidekiqStatus::Monitor.instance.start unless ENV['DISABLE_SIDEKIQ_INTERRUPT_RUNNING_JOBS'] + if ENV.fetch("SIDEKIQ_MONITOR_WORKER", 0).to_i.nonzero? + Gitlab::SidekiqMonitor.instance.start + end end if enable_reliable_fetch? diff --git a/doc/administration/troubleshooting/sidekiq.md b/doc/administration/troubleshooting/sidekiq.md index 7067958ecb4..9b016c64e29 100644 --- a/doc/administration/troubleshooting/sidekiq.md +++ b/doc/administration/troubleshooting/sidekiq.md @@ -169,3 +169,121 @@ The PostgreSQL wiki has details on the query you can run to see blocking queries. The query is different based on PostgreSQL version. See [Lock Monitoring](https://wiki.postgresql.org/wiki/Lock_Monitoring) for the query details. + +## Managing Sidekiq queues + +It is possible to use [Sidekiq API](https://github.com/mperham/sidekiq/wiki/API) +to perform a number of troubleshoting on Sidekiq. + +These are the administrative commands and it should only be used if currently +admin interface is not suitable due to scale of installation. + +All this commands should be run using `gitlab-rails console`. + +### View the queue size + +```ruby +Sidekiq::Queue.new("pipeline_processing:build_queue").size +``` + +### Enumerate all enqueued jobs + +```ruby +queue = Sidekiq::Queue.new("chaos:chaos_sleep") +queue.each do |job| + # job.klass # => 'MyWorker' + # job.args # => [1, 2, 3] + # job.jid # => jid + # job.queue # => chaos:chaos_sleep + # job["retry"] # => 3 + # job.item # => { + # "class"=>"Chaos::SleepWorker", + # "args"=>[1000], + # "retry"=>3, + # "queue"=>"chaos:chaos_sleep", + # "backtrace"=>true, + # "queue_namespace"=>"chaos", + # "jid"=>"39bc482b823cceaf07213523", + # "created_at"=>1566317076.266069, + # "correlation_id"=>"c323b832-a857-4858-b695-672de6f0e1af", + # "enqueued_at"=>1566317076.26761}, + # } + + # job.delete if job.jid == 'abcdef1234567890' +end +``` + +### Enumerate currently running jobs + +```ruby +workers = Sidekiq::Workers.new +workers.each do |process_id, thread_id, work| + # process_id is a unique identifier per Sidekiq process + # thread_id is a unique identifier per thread + # work is a Hash which looks like: + # {"queue"=>"chaos:chaos_sleep", + # "payload"=> + # { "class"=>"Chaos::SleepWorker", + # "args"=>[1000], + # "retry"=>3, + # "queue"=>"chaos:chaos_sleep", + # "backtrace"=>true, + # "queue_namespace"=>"chaos", + # "jid"=>"b2a31e3eac7b1a99ff235869", + # "created_at"=>1566316974.9215662, + # "correlation_id"=>"e484fb26-7576-45f9-bf21-b99389e1c53c", + # "enqueued_at"=>1566316974.9229589}, + # "run_at"=>1566316974}], +end +``` + +### Remove sidekiq jobs for given parameters (destructive) + +```ruby +# for jobs like this: +# RepositoryImportWorker.new.perform_async(100) +id_list = [100] + +queue = Sidekiq::Queue.new('repository_import') +queue.each do |job| + job.delete if id_list.include?(job.args[0]) +end +``` + +### Remove specific job ID (destructive) + +```ruby +queue = Sidekiq::Queue.new('repository_import') +queue.each do |job| + job.delete if job.jid == 'my-job-id' +end +``` + +## Canceling running jobs (destructive) + +> Introduced in GitLab 12.3. + +This is highly risky operation and use it as last resort. +Doing that might result in data corruption, as the job +is interrupted mid-execution and it is not guaranteed +that proper rollback of transactions is implemented. + +```ruby +Gitlab::SidekiqMonitor.cancel_job('job-id') +``` + +> This requires the Sidekiq to be run with `SIDEKIQ_MONITOR_WORKER=1` +> environment variable. + +To perform of the interrupt we use `Thread.raise` which +has number of drawbacks, as mentioned in [Why Ruby’s Timeout is dangerous (and Thread.raise is terrifying)](https://jvns.ca/blog/2015/11/27/why-rubys-timeout-is-dangerous-and-thread-dot-raise-is-terrifying/): + +> This is where the implications get interesting, and terrifying. This means that an exception can get raised: +> +> * during a network request (ok, as long as the surrounding code is prepared to catch Timeout::Error) +> * during the cleanup for the network request +> * during a rescue block +> * while creating an object to save to the database afterwards +> * in any of your code, regardless of whether it could have possibly raised an exception before +> +> Nobody writes code to defend against an exception being raised on literally any line. That’s not even possible. So Thread.raise is basically like a sneak attack on your code that could result in almost anything. It would probably be okay if it were pure-functional code that did not modify any state. But this is Ruby, so that’s unlikely :) diff --git a/lib/gitlab/sidekiq_middleware/jobs_threads.rb b/lib/gitlab/sidekiq_middleware/jobs_threads.rb deleted file mode 100644 index d0603bcee2d..00000000000 --- a/lib/gitlab/sidekiq_middleware/jobs_threads.rb +++ /dev/null @@ -1,49 +0,0 @@ -# frozen_string_literal: true - -module Gitlab - module SidekiqMiddleware - class JobsThreads - @@jobs = {} # rubocop:disable Style/ClassVars - MUTEX = Mutex.new - - def call(worker, job, queue) - jid = job['jid'] - - MUTEX.synchronize do - @@jobs[jid] = Thread.current - end - - return if self.class.cancelled?(jid) - - yield - ensure - MUTEX.synchronize do - @@jobs.delete(jid) - end - end - - def self.interrupt(jid) - MUTEX.synchronize do - thread = @@jobs[jid] - break unless thread - - thread.raise(Interrupt) - thread - end - end - - def self.cancelled?(jid) - Sidekiq.redis {|c| c.exists("cancelled-#{jid}") } - end - - def self.mark_job_as_cancelled(jid) - Sidekiq.redis {|c| c.setex("cancelled-#{jid}", 86400, 1) } - "Marked job as cancelled(if Sidekiq retry within 24 hours, the job will be skipped as `processed`). Jid: #{jid}" - end - - def self.jobs - @@jobs - end - end - end -end diff --git a/lib/gitlab/sidekiq_middleware/monitor.rb b/lib/gitlab/sidekiq_middleware/monitor.rb new file mode 100644 index 00000000000..2d0e5a6d635 --- /dev/null +++ b/lib/gitlab/sidekiq_middleware/monitor.rb @@ -0,0 +1,13 @@ +# frozen_string_literal: true + +module Gitlab + module SidekiqMiddleware + class Monitor + def call(worker, job, queue) + Gitlab::SidekiqMonitor.instance.within_job(job['jid'], queue) do + yield + end + end + end + end +end diff --git a/lib/gitlab/sidekiq_monitor.rb b/lib/gitlab/sidekiq_monitor.rb new file mode 100644 index 00000000000..0d4709508a0 --- /dev/null +++ b/lib/gitlab/sidekiq_monitor.rb @@ -0,0 +1,153 @@ +# frozen_string_literal: true + +module Gitlab + class SidekiqMonitor < Daemon + include ::Gitlab::Utils::StrongMemoize + + NOTIFICATION_CHANNEL = 'sidekiq:cancel:notifications'.freeze + CANCEL_DEADLINE = 24.hours.seconds + + # We use exception derived from `Exception` + # to consider this as an very low-level exception + # that should not be caught by application + CancelledError = Class.new(Exception) # rubocop:disable Lint/InheritException + + attr_reader :jobs_thread + attr_reader :jobs_mutex + + def initialize + super + + @jobs_thread = {} + @jobs_mutex = Mutex.new + end + + def within_job(jid, queue) + jobs_mutex.synchronize do + jobs_thread[jid] = Thread.current + end + + if cancelled?(jid) + Sidekiq.logger.warn( + class: self.class, + action: 'run', + queue: queue, + jid: jid, + canceled: true) + raise CancelledError + end + + yield + ensure + jobs_mutex.synchronize do + jobs_thread.delete(jid) + end + end + + def start_working + Sidekiq.logger.info( + class: self.class, + action: 'start', + message: 'Starting Monitor Daemon') + + ::Gitlab::Redis::SharedState.with do |redis| + redis.subscribe(NOTIFICATION_CHANNEL) do |on| + on.message do |channel, message| + process_message(message) + end + end + end + + Sidekiq.logger.warn( + class: self.class, + action: 'stop', + message: 'Stopping Monitor Daemon') + rescue Exception => e # rubocop:disable Lint/RescueException + Sidekiq.logger.warn( + class: self.class, + action: 'exception', + message: e.message) + raise e + end + + def self.cancel_job(jid) + payload = { + action: 'cancel', + jid: jid + }.to_json + + ::Gitlab::Redis::SharedState.with do |redis| + redis.setex(cancel_job_key(jid), CANCEL_DEADLINE, 1) + redis.publish(NOTIFICATION_CHANNEL, payload) + end + end + + private + + def process_message(message) + Sidekiq.logger.info( + class: self.class, + channel: NOTIFICATION_CHANNEL, + message: 'Received payload on channel', + payload: message) + + message = safe_parse(message) + return unless message + + case message['action'] + when 'cancel' + process_job_cancel(message['jid']) + else + # unknown message + end + end + + def safe_parse(message) + JSON.parse(message) + rescue JSON::ParserError + end + + def process_job_cancel(jid) + return unless jid + + # since this might take time, process cancel in a new thread + Thread.new do + find_thread(jid) do |thread| + next unless thread + + Sidekiq.logger.warn( + class: self.class, + action: 'cancel', + message: 'Canceling thread with CancelledError', + jid: jid, + thread_id: thread.object_id) + + thread&.raise(CancelledError) + end + end + end + + # This method needs to be thread-safe + # This is why it passes thread in block, + # to ensure that we do process this thread + def find_thread(jid) + return unless jid + + jobs_mutex.synchronize do + thread = jobs_thread[jid] + yield(thread) + thread + end + end + + def cancelled?(jid) + ::Gitlab::Redis::SharedState.with do |redis| + redis.exists(self.class.cancel_job_key(jid)) + end + end + + def self.cancel_job_key(jid) + "sidekiq:cancel:#{jid}" + end + end +end diff --git a/lib/gitlab/sidekiq_status/monitor.rb b/lib/gitlab/sidekiq_status/monitor.rb deleted file mode 100644 index 3fd9f02b166..00000000000 --- a/lib/gitlab/sidekiq_status/monitor.rb +++ /dev/null @@ -1,46 +0,0 @@ -# frozen_string_literal: true - -module Gitlab - module SidekiqStatus - class Monitor < Daemon - include ::Gitlab::Utils::StrongMemoize - - NOTIFICATION_CHANNEL = 'sidekiq:cancel:notifications'.freeze - - def start_working - Sidekiq.logger.info "Watching sidekiq monitor" - - ::Gitlab::Redis::SharedState.with do |redis| - redis.subscribe(NOTIFICATION_CHANNEL) do |on| - on.message do |channel, message| - Sidekiq.logger.info "Received #{message} on #{channel}..." - execute_job_cancel(message) - end - end - end - end - - def self.cancel_job(jid) - Gitlab::Redis::SharedState.with do |redis| - redis.publish(NOTIFICATION_CHANNEL, jid) - "Notification sent. Job should be cancelled soon. Check log to confirm. Jid: #{jid}" - end - end - - private - - def execute_job_cancel(jid) - Gitlab::SidekiqMiddleware::JobsThreads.mark_job_as_cancelled(jid) - - thread = Gitlab::SidekiqMiddleware::JobsThreads - .interrupt(jid) - - if thread - Sidekiq.logger.info "Interrupted thread: #{thread} for #{jid}." - else - Sidekiq.logger.info "Did not find thread for #{jid}." - end - end - end - end -end diff --git a/spec/lib/gitlab/sidekiq_middleware/jobs_threads_spec.rb b/spec/lib/gitlab/sidekiq_middleware/jobs_threads_spec.rb deleted file mode 100644 index 58cae3e42e0..00000000000 --- a/spec/lib/gitlab/sidekiq_middleware/jobs_threads_spec.rb +++ /dev/null @@ -1,83 +0,0 @@ -require 'spec_helper' - -describe Gitlab::SidekiqMiddleware::JobsThreads do - subject { described_class.new } - - let(:worker) { double(:worker, class: Chaos::SleepWorker) } - let(:jid) { '581f90fbd2f24deabcbde2f9' } - let(:job) { { 'jid' => jid } } - let(:jid_thread) { '684f90fbd2f24deabcbde2f9' } - let(:job_thread) { { 'jid' => jid_thread } } - let(:queue) { 'test_queue' } - let(:mark_job_as_cancelled) { Sidekiq.redis {|c| c.setex("cancelled-#{jid}", 2, 1) } } - - def run_job - subject.call(worker, job, queue) do - sleep 2 - "mock return from yield" - end - end - - def run_job_thread - Thread.new do - subject.call(worker, job_thread, queue) do - sleep 3 - "mock return from yield" - end - end - end - - describe '.call' do - context 'by default' do - it 'return from yield' do - expect(run_job).to eq("mock return from yield") - end - end - - context 'when job is marked as cancelled' do - before do - mark_job_as_cancelled - end - - it 'return directly' do - expect(run_job).to be_nil - end - end - end - - describe '.self.interrupt' do - before do - run_job_thread - sleep 1 - end - - it 'interrupt the job with correct jid' do - expect(described_class.jobs[jid_thread]).to receive(:raise).with(Interrupt) - expect(described_class.interrupt jid_thread).to eq(described_class.jobs[jid_thread]) - end - - it 'do nothing with wrong jid' do - expect(described_class.jobs[jid_thread]).not_to receive(:raise) - expect(described_class.interrupt 'wrong_jid').to be_nil - end - end - - describe '.self.cancelled?' do - it 'return true when job is marked as cancelled' do - mark_job_as_cancelled - expect(described_class.cancelled? jid).to be true - end - - it 'return false when job is not marked as cancelled' do - expect(described_class.cancelled? 'non-exists-jid').to be false - end - end - - describe '.self.mark_job_as_cancelled' do - it 'set Redis key' do - described_class.mark_job_as_cancelled('jid_123') - - expect(described_class.cancelled? 'jid_123').to be true - end - end -end diff --git a/spec/lib/gitlab/sidekiq_middleware/monitor_spec.rb b/spec/lib/gitlab/sidekiq_middleware/monitor_spec.rb new file mode 100644 index 00000000000..3ca2ddf3cb1 --- /dev/null +++ b/spec/lib/gitlab/sidekiq_middleware/monitor_spec.rb @@ -0,0 +1,29 @@ +# frozen_string_literal: true + +require 'spec_helper' + +describe Gitlab::SidekiqMiddleware::Monitor do + let(:monitor) { described_class.new } + + describe '#call' do + let(:worker) { double } + let(:job) { { 'jid' => 'job-id' } } + let(:queue) { 'my-queue' } + + it 'calls SidekiqMonitor' do + expect(Gitlab::SidekiqMonitor.instance).to receive(:within_job) + .with('job-id', 'my-queue') + .and_call_original + + expect { |blk| monitor.call(worker, job, queue, &blk) }.to yield_control + end + + it 'passthroughs the return value' do + result = monitor.call(worker, job, queue) do + 'value' + end + + expect(result).to eq('value') + end + end +end diff --git a/spec/lib/gitlab/sidekiq_monitor_spec.rb b/spec/lib/gitlab/sidekiq_monitor_spec.rb new file mode 100644 index 00000000000..7c9fc598b02 --- /dev/null +++ b/spec/lib/gitlab/sidekiq_monitor_spec.rb @@ -0,0 +1,206 @@ +# frozen_string_literal: true + +require 'spec_helper' + +describe Gitlab::SidekiqMonitor do + let(:monitor) { described_class.new } + + describe '#within_job' do + it 'tracks thread' do + blk = proc do + expect(monitor.jobs_thread['jid']).not_to be_nil + + "OK" + end + + expect(monitor.within_job('jid', 'queue', &blk)).to eq("OK") + end + + context 'when job is canceled' do + let(:jid) { SecureRandom.hex } + + before do + described_class.cancel_job(jid) + end + + it 'does not execute a block' do + expect do |blk| + monitor.within_job(jid, 'queue', &blk) + rescue described_class::CancelledError + end.not_to yield_control + end + + it 'raises exception' do + expect { monitor.within_job(jid, 'queue') }.to raise_error(described_class::CancelledError) + end + end + end + + describe '#start_working' do + subject { monitor.start_working } + + context 'when structured logging is used' do + before do + allow_any_instance_of(::Redis).to receive(:subscribe) + end + + it 'logs start message' do + expect(Sidekiq.logger).to receive(:info) + .with( + class: described_class, + action: 'start', + message: 'Starting Monitor Daemon') + + subject + end + + it 'logs stop message' do + expect(Sidekiq.logger).to receive(:warn) + .with( + class: described_class, + action: 'stop', + message: 'Stopping Monitor Daemon') + + subject + end + + it 'logs exception message' do + expect(Sidekiq.logger).to receive(:warn) + .with( + class: described_class, + action: 'exception', + message: 'My Exception') + + expect(::Gitlab::Redis::SharedState).to receive(:with) + .and_raise(Exception, 'My Exception') + + expect { subject }.to raise_error(Exception, 'My Exception') + end + end + + context 'when message is published' do + let(:subscribed) { double } + + before do + expect_any_instance_of(::Redis).to receive(:subscribe) + .and_yield(subscribed) + + expect(subscribed).to receive(:message) + .and_yield( + described_class::NOTIFICATION_CHANNEL, + payload + ) + + expect(Sidekiq.logger).to receive(:info) + .with( + class: described_class, + action: 'start', + message: 'Starting Monitor Daemon') + + expect(Sidekiq.logger).to receive(:info) + .with( + class: described_class, + channel: described_class::NOTIFICATION_CHANNEL, + message: 'Received payload on channel', + payload: payload + ) + end + + context 'and message is valid' do + let(:payload) { '{"action":"cancel","jid":"my-jid"}' } + + it 'processes cancel' do + expect(monitor).to receive(:process_job_cancel).with('my-jid') + + subject + end + end + + context 'and message is not valid json' do + let(:payload) { '{"action"}' } + + it 'skips processing' do + expect(monitor).not_to receive(:process_job_cancel) + + subject + end + end + end + end + + describe '#process_job_cancel' do + subject { monitor.send(:process_job_cancel, jid) } + + context 'when jid is missing' do + let(:jid) { nil } + + it 'does not run thread' do + expect(subject).to be_nil + end + end + + context 'when jid is provided' do + let(:jid) { 'my-jid' } + + context 'when jid is not found' do + it 'does not log cancellation message' do + expect(Sidekiq.logger).not_to receive(:warn) + expect(subject).to be_a(Thread) + + subject.join + end + end + + context 'when jid is found' do + let(:thread) { Thread.new { sleep 1000 } } + + before do + monitor.jobs_thread[jid] = thread + end + + it 'does log cancellation message' do + expect(Sidekiq.logger).to receive(:warn) + .with( + class: described_class, + action: 'cancel', + message: 'Canceling thread with CancelledError', + jid: 'my-jid', + thread_id: thread.object_id) + + expect(subject).to be_a(Thread) + + subject.join + end + + it 'does cancel the thread' do + expect(subject).to be_a(Thread) + + subject.join + + expect(thread).not_to be_alive + expect { thread.value }.to raise_error(described_class::CancelledError) + end + end + end + end + + describe '.cancel_job' do + subject { described_class.cancel_job('my-jid') } + + it 'sets a redis key' do + expect_any_instance_of(::Redis).to receive(:setex) + .with('sidekiq:cancel:my-jid', anything, 1) + + subject + end + + it 'notifies all workers' do + payload = '{"action":"cancel","jid":"my-jid"}' + + expect_any_instance_of(::Redis).to receive(:publish) + .with('sidekiq:cancel:notifications', payload) + + subject + end + end +end |