summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKamil Trzciński <ayufan@ayufan.eu>2019-08-20 17:25:04 +0200
committerKamil Trzciński <ayufan@ayufan.eu>2019-08-21 12:05:30 +0200
commitc2cbfc5c4afbe8385659f97769db8450284639cf (patch)
tree430ac243924b4b3fb4e389a9c763ea6bd484c2f0
parent75e2302d0126c4bc8ea215ffb4e72612d44e73bb (diff)
downloadgitlab-ce-c2cbfc5c4afbe8385659f97769db8450284639cf.tar.gz
Rework `Sidekiq::JobsThreads` into `Monitor`
This makes: - very shallow `Middleware::Monitor` to only request tracking of sidekiq jobs, - `SidekiqStatus::Monitor` to be responsible to maintain persistent connection to receive messages, - `SidekiqStatus::Monitor` to always use structured logging and instance variables
-rw-r--r--config/initializers/sidekiq.rb6
-rw-r--r--doc/administration/troubleshooting/sidekiq.md118
-rw-r--r--lib/gitlab/sidekiq_middleware/jobs_threads.rb49
-rw-r--r--lib/gitlab/sidekiq_middleware/monitor.rb13
-rw-r--r--lib/gitlab/sidekiq_monitor.rb153
-rw-r--r--lib/gitlab/sidekiq_status/monitor.rb46
-rw-r--r--spec/lib/gitlab/sidekiq_middleware/jobs_threads_spec.rb83
-rw-r--r--spec/lib/gitlab/sidekiq_middleware/monitor_spec.rb29
-rw-r--r--spec/lib/gitlab/sidekiq_monitor_spec.rb206
9 files changed, 523 insertions, 180 deletions
diff --git a/config/initializers/sidekiq.rb b/config/initializers/sidekiq.rb
index b05d4342e23..e145af5e2d5 100644
--- a/config/initializers/sidekiq.rb
+++ b/config/initializers/sidekiq.rb
@@ -33,7 +33,7 @@ Sidekiq.configure_server do |config|
config.redis = queues_config_hash
config.server_middleware do |chain|
- chain.add Gitlab::SidekiqMiddleware::JobsThreads unless ENV['DISABLE_SIDEKIQ_INTERRUPT_RUNNING_JOBS']
+ chain.add Gitlab::SidekiqMiddleware::Monitor
chain.add Gitlab::SidekiqMiddleware::Metrics if Settings.monitoring.sidekiq_exporter
chain.add Gitlab::SidekiqMiddleware::ArgumentsLogger if ENV['SIDEKIQ_LOG_ARGUMENTS'] && !enable_json_logs
chain.add Gitlab::SidekiqMiddleware::MemoryKiller if ENV['SIDEKIQ_MEMORY_KILLER_MAX_RSS']
@@ -59,7 +59,9 @@ Sidekiq.configure_server do |config|
# Sidekiq (e.g. in an initializer).
ActiveRecord::Base.clear_all_connections!
- Gitlab::SidekiqStatus::Monitor.instance.start unless ENV['DISABLE_SIDEKIQ_INTERRUPT_RUNNING_JOBS']
+ if ENV.fetch("SIDEKIQ_MONITOR_WORKER", 0).to_i.nonzero?
+ Gitlab::SidekiqMonitor.instance.start
+ end
end
if enable_reliable_fetch?
diff --git a/doc/administration/troubleshooting/sidekiq.md b/doc/administration/troubleshooting/sidekiq.md
index 7067958ecb4..9b016c64e29 100644
--- a/doc/administration/troubleshooting/sidekiq.md
+++ b/doc/administration/troubleshooting/sidekiq.md
@@ -169,3 +169,121 @@ The PostgreSQL wiki has details on the query you can run to see blocking
queries. The query is different based on PostgreSQL version. See
[Lock Monitoring](https://wiki.postgresql.org/wiki/Lock_Monitoring) for
the query details.
+
+## Managing Sidekiq queues
+
+It is possible to use [Sidekiq API](https://github.com/mperham/sidekiq/wiki/API)
+to perform a number of troubleshoting on Sidekiq.
+
+These are the administrative commands and it should only be used if currently
+admin interface is not suitable due to scale of installation.
+
+All this commands should be run using `gitlab-rails console`.
+
+### View the queue size
+
+```ruby
+Sidekiq::Queue.new("pipeline_processing:build_queue").size
+```
+
+### Enumerate all enqueued jobs
+
+```ruby
+queue = Sidekiq::Queue.new("chaos:chaos_sleep")
+queue.each do |job|
+ # job.klass # => 'MyWorker'
+ # job.args # => [1, 2, 3]
+ # job.jid # => jid
+ # job.queue # => chaos:chaos_sleep
+ # job["retry"] # => 3
+ # job.item # => {
+ # "class"=>"Chaos::SleepWorker",
+ # "args"=>[1000],
+ # "retry"=>3,
+ # "queue"=>"chaos:chaos_sleep",
+ # "backtrace"=>true,
+ # "queue_namespace"=>"chaos",
+ # "jid"=>"39bc482b823cceaf07213523",
+ # "created_at"=>1566317076.266069,
+ # "correlation_id"=>"c323b832-a857-4858-b695-672de6f0e1af",
+ # "enqueued_at"=>1566317076.26761},
+ # }
+
+ # job.delete if job.jid == 'abcdef1234567890'
+end
+```
+
+### Enumerate currently running jobs
+
+```ruby
+workers = Sidekiq::Workers.new
+workers.each do |process_id, thread_id, work|
+ # process_id is a unique identifier per Sidekiq process
+ # thread_id is a unique identifier per thread
+ # work is a Hash which looks like:
+ # {"queue"=>"chaos:chaos_sleep",
+ # "payload"=>
+ # { "class"=>"Chaos::SleepWorker",
+ # "args"=>[1000],
+ # "retry"=>3,
+ # "queue"=>"chaos:chaos_sleep",
+ # "backtrace"=>true,
+ # "queue_namespace"=>"chaos",
+ # "jid"=>"b2a31e3eac7b1a99ff235869",
+ # "created_at"=>1566316974.9215662,
+ # "correlation_id"=>"e484fb26-7576-45f9-bf21-b99389e1c53c",
+ # "enqueued_at"=>1566316974.9229589},
+ # "run_at"=>1566316974}],
+end
+```
+
+### Remove sidekiq jobs for given parameters (destructive)
+
+```ruby
+# for jobs like this:
+# RepositoryImportWorker.new.perform_async(100)
+id_list = [100]
+
+queue = Sidekiq::Queue.new('repository_import')
+queue.each do |job|
+ job.delete if id_list.include?(job.args[0])
+end
+```
+
+### Remove specific job ID (destructive)
+
+```ruby
+queue = Sidekiq::Queue.new('repository_import')
+queue.each do |job|
+ job.delete if job.jid == 'my-job-id'
+end
+```
+
+## Canceling running jobs (destructive)
+
+> Introduced in GitLab 12.3.
+
+This is highly risky operation and use it as last resort.
+Doing that might result in data corruption, as the job
+is interrupted mid-execution and it is not guaranteed
+that proper rollback of transactions is implemented.
+
+```ruby
+Gitlab::SidekiqMonitor.cancel_job('job-id')
+```
+
+> This requires the Sidekiq to be run with `SIDEKIQ_MONITOR_WORKER=1`
+> environment variable.
+
+To perform of the interrupt we use `Thread.raise` which
+has number of drawbacks, as mentioned in [Why Ruby’s Timeout is dangerous (and Thread.raise is terrifying)](https://jvns.ca/blog/2015/11/27/why-rubys-timeout-is-dangerous-and-thread-dot-raise-is-terrifying/):
+
+> This is where the implications get interesting, and terrifying. This means that an exception can get raised:
+>
+> * during a network request (ok, as long as the surrounding code is prepared to catch Timeout::Error)
+> * during the cleanup for the network request
+> * during a rescue block
+> * while creating an object to save to the database afterwards
+> * in any of your code, regardless of whether it could have possibly raised an exception before
+>
+> Nobody writes code to defend against an exception being raised on literally any line. That’s not even possible. So Thread.raise is basically like a sneak attack on your code that could result in almost anything. It would probably be okay if it were pure-functional code that did not modify any state. But this is Ruby, so that’s unlikely :)
diff --git a/lib/gitlab/sidekiq_middleware/jobs_threads.rb b/lib/gitlab/sidekiq_middleware/jobs_threads.rb
deleted file mode 100644
index d0603bcee2d..00000000000
--- a/lib/gitlab/sidekiq_middleware/jobs_threads.rb
+++ /dev/null
@@ -1,49 +0,0 @@
-# frozen_string_literal: true
-
-module Gitlab
- module SidekiqMiddleware
- class JobsThreads
- @@jobs = {} # rubocop:disable Style/ClassVars
- MUTEX = Mutex.new
-
- def call(worker, job, queue)
- jid = job['jid']
-
- MUTEX.synchronize do
- @@jobs[jid] = Thread.current
- end
-
- return if self.class.cancelled?(jid)
-
- yield
- ensure
- MUTEX.synchronize do
- @@jobs.delete(jid)
- end
- end
-
- def self.interrupt(jid)
- MUTEX.synchronize do
- thread = @@jobs[jid]
- break unless thread
-
- thread.raise(Interrupt)
- thread
- end
- end
-
- def self.cancelled?(jid)
- Sidekiq.redis {|c| c.exists("cancelled-#{jid}") }
- end
-
- def self.mark_job_as_cancelled(jid)
- Sidekiq.redis {|c| c.setex("cancelled-#{jid}", 86400, 1) }
- "Marked job as cancelled(if Sidekiq retry within 24 hours, the job will be skipped as `processed`). Jid: #{jid}"
- end
-
- def self.jobs
- @@jobs
- end
- end
- end
-end
diff --git a/lib/gitlab/sidekiq_middleware/monitor.rb b/lib/gitlab/sidekiq_middleware/monitor.rb
new file mode 100644
index 00000000000..2d0e5a6d635
--- /dev/null
+++ b/lib/gitlab/sidekiq_middleware/monitor.rb
@@ -0,0 +1,13 @@
+# frozen_string_literal: true
+
+module Gitlab
+ module SidekiqMiddleware
+ class Monitor
+ def call(worker, job, queue)
+ Gitlab::SidekiqMonitor.instance.within_job(job['jid'], queue) do
+ yield
+ end
+ end
+ end
+ end
+end
diff --git a/lib/gitlab/sidekiq_monitor.rb b/lib/gitlab/sidekiq_monitor.rb
new file mode 100644
index 00000000000..0d4709508a0
--- /dev/null
+++ b/lib/gitlab/sidekiq_monitor.rb
@@ -0,0 +1,153 @@
+# frozen_string_literal: true
+
+module Gitlab
+ class SidekiqMonitor < Daemon
+ include ::Gitlab::Utils::StrongMemoize
+
+ NOTIFICATION_CHANNEL = 'sidekiq:cancel:notifications'.freeze
+ CANCEL_DEADLINE = 24.hours.seconds
+
+ # We use exception derived from `Exception`
+ # to consider this as an very low-level exception
+ # that should not be caught by application
+ CancelledError = Class.new(Exception) # rubocop:disable Lint/InheritException
+
+ attr_reader :jobs_thread
+ attr_reader :jobs_mutex
+
+ def initialize
+ super
+
+ @jobs_thread = {}
+ @jobs_mutex = Mutex.new
+ end
+
+ def within_job(jid, queue)
+ jobs_mutex.synchronize do
+ jobs_thread[jid] = Thread.current
+ end
+
+ if cancelled?(jid)
+ Sidekiq.logger.warn(
+ class: self.class,
+ action: 'run',
+ queue: queue,
+ jid: jid,
+ canceled: true)
+ raise CancelledError
+ end
+
+ yield
+ ensure
+ jobs_mutex.synchronize do
+ jobs_thread.delete(jid)
+ end
+ end
+
+ def start_working
+ Sidekiq.logger.info(
+ class: self.class,
+ action: 'start',
+ message: 'Starting Monitor Daemon')
+
+ ::Gitlab::Redis::SharedState.with do |redis|
+ redis.subscribe(NOTIFICATION_CHANNEL) do |on|
+ on.message do |channel, message|
+ process_message(message)
+ end
+ end
+ end
+
+ Sidekiq.logger.warn(
+ class: self.class,
+ action: 'stop',
+ message: 'Stopping Monitor Daemon')
+ rescue Exception => e # rubocop:disable Lint/RescueException
+ Sidekiq.logger.warn(
+ class: self.class,
+ action: 'exception',
+ message: e.message)
+ raise e
+ end
+
+ def self.cancel_job(jid)
+ payload = {
+ action: 'cancel',
+ jid: jid
+ }.to_json
+
+ ::Gitlab::Redis::SharedState.with do |redis|
+ redis.setex(cancel_job_key(jid), CANCEL_DEADLINE, 1)
+ redis.publish(NOTIFICATION_CHANNEL, payload)
+ end
+ end
+
+ private
+
+ def process_message(message)
+ Sidekiq.logger.info(
+ class: self.class,
+ channel: NOTIFICATION_CHANNEL,
+ message: 'Received payload on channel',
+ payload: message)
+
+ message = safe_parse(message)
+ return unless message
+
+ case message['action']
+ when 'cancel'
+ process_job_cancel(message['jid'])
+ else
+ # unknown message
+ end
+ end
+
+ def safe_parse(message)
+ JSON.parse(message)
+ rescue JSON::ParserError
+ end
+
+ def process_job_cancel(jid)
+ return unless jid
+
+ # since this might take time, process cancel in a new thread
+ Thread.new do
+ find_thread(jid) do |thread|
+ next unless thread
+
+ Sidekiq.logger.warn(
+ class: self.class,
+ action: 'cancel',
+ message: 'Canceling thread with CancelledError',
+ jid: jid,
+ thread_id: thread.object_id)
+
+ thread&.raise(CancelledError)
+ end
+ end
+ end
+
+ # This method needs to be thread-safe
+ # This is why it passes thread in block,
+ # to ensure that we do process this thread
+ def find_thread(jid)
+ return unless jid
+
+ jobs_mutex.synchronize do
+ thread = jobs_thread[jid]
+ yield(thread)
+ thread
+ end
+ end
+
+ def cancelled?(jid)
+ ::Gitlab::Redis::SharedState.with do |redis|
+ redis.exists(self.class.cancel_job_key(jid))
+ end
+ end
+
+ def self.cancel_job_key(jid)
+ "sidekiq:cancel:#{jid}"
+ end
+ end
+end
diff --git a/lib/gitlab/sidekiq_status/monitor.rb b/lib/gitlab/sidekiq_status/monitor.rb
deleted file mode 100644
index 3fd9f02b166..00000000000
--- a/lib/gitlab/sidekiq_status/monitor.rb
+++ /dev/null
@@ -1,46 +0,0 @@
-# frozen_string_literal: true
-
-module Gitlab
- module SidekiqStatus
- class Monitor < Daemon
- include ::Gitlab::Utils::StrongMemoize
-
- NOTIFICATION_CHANNEL = 'sidekiq:cancel:notifications'.freeze
-
- def start_working
- Sidekiq.logger.info "Watching sidekiq monitor"
-
- ::Gitlab::Redis::SharedState.with do |redis|
- redis.subscribe(NOTIFICATION_CHANNEL) do |on|
- on.message do |channel, message|
- Sidekiq.logger.info "Received #{message} on #{channel}..."
- execute_job_cancel(message)
- end
- end
- end
- end
-
- def self.cancel_job(jid)
- Gitlab::Redis::SharedState.with do |redis|
- redis.publish(NOTIFICATION_CHANNEL, jid)
- "Notification sent. Job should be cancelled soon. Check log to confirm. Jid: #{jid}"
- end
- end
-
- private
-
- def execute_job_cancel(jid)
- Gitlab::SidekiqMiddleware::JobsThreads.mark_job_as_cancelled(jid)
-
- thread = Gitlab::SidekiqMiddleware::JobsThreads
- .interrupt(jid)
-
- if thread
- Sidekiq.logger.info "Interrupted thread: #{thread} for #{jid}."
- else
- Sidekiq.logger.info "Did not find thread for #{jid}."
- end
- end
- end
- end
-end
diff --git a/spec/lib/gitlab/sidekiq_middleware/jobs_threads_spec.rb b/spec/lib/gitlab/sidekiq_middleware/jobs_threads_spec.rb
deleted file mode 100644
index 58cae3e42e0..00000000000
--- a/spec/lib/gitlab/sidekiq_middleware/jobs_threads_spec.rb
+++ /dev/null
@@ -1,83 +0,0 @@
-require 'spec_helper'
-
-describe Gitlab::SidekiqMiddleware::JobsThreads do
- subject { described_class.new }
-
- let(:worker) { double(:worker, class: Chaos::SleepWorker) }
- let(:jid) { '581f90fbd2f24deabcbde2f9' }
- let(:job) { { 'jid' => jid } }
- let(:jid_thread) { '684f90fbd2f24deabcbde2f9' }
- let(:job_thread) { { 'jid' => jid_thread } }
- let(:queue) { 'test_queue' }
- let(:mark_job_as_cancelled) { Sidekiq.redis {|c| c.setex("cancelled-#{jid}", 2, 1) } }
-
- def run_job
- subject.call(worker, job, queue) do
- sleep 2
- "mock return from yield"
- end
- end
-
- def run_job_thread
- Thread.new do
- subject.call(worker, job_thread, queue) do
- sleep 3
- "mock return from yield"
- end
- end
- end
-
- describe '.call' do
- context 'by default' do
- it 'return from yield' do
- expect(run_job).to eq("mock return from yield")
- end
- end
-
- context 'when job is marked as cancelled' do
- before do
- mark_job_as_cancelled
- end
-
- it 'return directly' do
- expect(run_job).to be_nil
- end
- end
- end
-
- describe '.self.interrupt' do
- before do
- run_job_thread
- sleep 1
- end
-
- it 'interrupt the job with correct jid' do
- expect(described_class.jobs[jid_thread]).to receive(:raise).with(Interrupt)
- expect(described_class.interrupt jid_thread).to eq(described_class.jobs[jid_thread])
- end
-
- it 'do nothing with wrong jid' do
- expect(described_class.jobs[jid_thread]).not_to receive(:raise)
- expect(described_class.interrupt 'wrong_jid').to be_nil
- end
- end
-
- describe '.self.cancelled?' do
- it 'return true when job is marked as cancelled' do
- mark_job_as_cancelled
- expect(described_class.cancelled? jid).to be true
- end
-
- it 'return false when job is not marked as cancelled' do
- expect(described_class.cancelled? 'non-exists-jid').to be false
- end
- end
-
- describe '.self.mark_job_as_cancelled' do
- it 'set Redis key' do
- described_class.mark_job_as_cancelled('jid_123')
-
- expect(described_class.cancelled? 'jid_123').to be true
- end
- end
-end
diff --git a/spec/lib/gitlab/sidekiq_middleware/monitor_spec.rb b/spec/lib/gitlab/sidekiq_middleware/monitor_spec.rb
new file mode 100644
index 00000000000..3ca2ddf3cb1
--- /dev/null
+++ b/spec/lib/gitlab/sidekiq_middleware/monitor_spec.rb
@@ -0,0 +1,29 @@
+# frozen_string_literal: true
+
+require 'spec_helper'
+
+describe Gitlab::SidekiqMiddleware::Monitor do
+ let(:monitor) { described_class.new }
+
+ describe '#call' do
+ let(:worker) { double }
+ let(:job) { { 'jid' => 'job-id' } }
+ let(:queue) { 'my-queue' }
+
+ it 'calls SidekiqMonitor' do
+ expect(Gitlab::SidekiqMonitor.instance).to receive(:within_job)
+ .with('job-id', 'my-queue')
+ .and_call_original
+
+ expect { |blk| monitor.call(worker, job, queue, &blk) }.to yield_control
+ end
+
+ it 'passthroughs the return value' do
+ result = monitor.call(worker, job, queue) do
+ 'value'
+ end
+
+ expect(result).to eq('value')
+ end
+ end
+end
diff --git a/spec/lib/gitlab/sidekiq_monitor_spec.rb b/spec/lib/gitlab/sidekiq_monitor_spec.rb
new file mode 100644
index 00000000000..7c9fc598b02
--- /dev/null
+++ b/spec/lib/gitlab/sidekiq_monitor_spec.rb
@@ -0,0 +1,206 @@
+# frozen_string_literal: true
+
+require 'spec_helper'
+
+describe Gitlab::SidekiqMonitor do
+ let(:monitor) { described_class.new }
+
+ describe '#within_job' do
+ it 'tracks thread' do
+ blk = proc do
+ expect(monitor.jobs_thread['jid']).not_to be_nil
+
+ "OK"
+ end
+
+ expect(monitor.within_job('jid', 'queue', &blk)).to eq("OK")
+ end
+
+ context 'when job is canceled' do
+ let(:jid) { SecureRandom.hex }
+
+ before do
+ described_class.cancel_job(jid)
+ end
+
+ it 'does not execute a block' do
+ expect do |blk|
+ monitor.within_job(jid, 'queue', &blk)
+ rescue described_class::CancelledError
+ end.not_to yield_control
+ end
+
+ it 'raises exception' do
+ expect { monitor.within_job(jid, 'queue') }.to raise_error(described_class::CancelledError)
+ end
+ end
+ end
+
+ describe '#start_working' do
+ subject { monitor.start_working }
+
+ context 'when structured logging is used' do
+ before do
+ allow_any_instance_of(::Redis).to receive(:subscribe)
+ end
+
+ it 'logs start message' do
+ expect(Sidekiq.logger).to receive(:info)
+ .with(
+ class: described_class,
+ action: 'start',
+ message: 'Starting Monitor Daemon')
+
+ subject
+ end
+
+ it 'logs stop message' do
+ expect(Sidekiq.logger).to receive(:warn)
+ .with(
+ class: described_class,
+ action: 'stop',
+ message: 'Stopping Monitor Daemon')
+
+ subject
+ end
+
+ it 'logs exception message' do
+ expect(Sidekiq.logger).to receive(:warn)
+ .with(
+ class: described_class,
+ action: 'exception',
+ message: 'My Exception')
+
+ expect(::Gitlab::Redis::SharedState).to receive(:with)
+ .and_raise(Exception, 'My Exception')
+
+ expect { subject }.to raise_error(Exception, 'My Exception')
+ end
+ end
+
+ context 'when message is published' do
+ let(:subscribed) { double }
+
+ before do
+ expect_any_instance_of(::Redis).to receive(:subscribe)
+ .and_yield(subscribed)
+
+ expect(subscribed).to receive(:message)
+ .and_yield(
+ described_class::NOTIFICATION_CHANNEL,
+ payload
+ )
+
+ expect(Sidekiq.logger).to receive(:info)
+ .with(
+ class: described_class,
+ action: 'start',
+ message: 'Starting Monitor Daemon')
+
+ expect(Sidekiq.logger).to receive(:info)
+ .with(
+ class: described_class,
+ channel: described_class::NOTIFICATION_CHANNEL,
+ message: 'Received payload on channel',
+ payload: payload
+ )
+ end
+
+ context 'and message is valid' do
+ let(:payload) { '{"action":"cancel","jid":"my-jid"}' }
+
+ it 'processes cancel' do
+ expect(monitor).to receive(:process_job_cancel).with('my-jid')
+
+ subject
+ end
+ end
+
+ context 'and message is not valid json' do
+ let(:payload) { '{"action"}' }
+
+ it 'skips processing' do
+ expect(monitor).not_to receive(:process_job_cancel)
+
+ subject
+ end
+ end
+ end
+ end
+
+ describe '#process_job_cancel' do
+ subject { monitor.send(:process_job_cancel, jid) }
+
+ context 'when jid is missing' do
+ let(:jid) { nil }
+
+ it 'does not run thread' do
+ expect(subject).to be_nil
+ end
+ end
+
+ context 'when jid is provided' do
+ let(:jid) { 'my-jid' }
+
+ context 'when jid is not found' do
+ it 'does not log cancellation message' do
+ expect(Sidekiq.logger).not_to receive(:warn)
+ expect(subject).to be_a(Thread)
+
+ subject.join
+ end
+ end
+
+ context 'when jid is found' do
+ let(:thread) { Thread.new { sleep 1000 } }
+
+ before do
+ monitor.jobs_thread[jid] = thread
+ end
+
+ it 'does log cancellation message' do
+ expect(Sidekiq.logger).to receive(:warn)
+ .with(
+ class: described_class,
+ action: 'cancel',
+ message: 'Canceling thread with CancelledError',
+ jid: 'my-jid',
+ thread_id: thread.object_id)
+
+ expect(subject).to be_a(Thread)
+
+ subject.join
+ end
+
+ it 'does cancel the thread' do
+ expect(subject).to be_a(Thread)
+
+ subject.join
+
+ expect(thread).not_to be_alive
+ expect { thread.value }.to raise_error(described_class::CancelledError)
+ end
+ end
+ end
+ end
+
+ describe '.cancel_job' do
+ subject { described_class.cancel_job('my-jid') }
+
+ it 'sets a redis key' do
+ expect_any_instance_of(::Redis).to receive(:setex)
+ .with('sidekiq:cancel:my-jid', anything, 1)
+
+ subject
+ end
+
+ it 'notifies all workers' do
+ payload = '{"action":"cancel","jid":"my-jid"}'
+
+ expect_any_instance_of(::Redis).to receive(:publish)
+ .with('sidekiq:cancel:notifications', payload)
+
+ subject
+ end
+ end
+end