From eeeaebe60813393940751ea9bf81e720b65bf95a Mon Sep 17 00:00:00 2001 From: Qingyu Zhao Date: Tue, 10 Sep 2019 22:12:05 +1000 Subject: Change Sidekiq monitor namespace Move Gitlab::SidekiqMonitor to namespace Gitlab::SidekiqDaemon::Monitor - Class name and file name change - File path change to lib/gitlab/sidekiq_daemon/monitor.rb - Update class usage/reference in other files, including documentation --- config/initializers/sidekiq.rb | 2 +- doc/administration/troubleshooting/sidekiq.md | 2 +- lib/gitlab/sidekiq_daemon/monitor.rb | 184 +++++++++++++++ lib/gitlab/sidekiq_middleware/monitor.rb | 4 +- lib/gitlab/sidekiq_monitor.rb | 182 -------------- spec/lib/gitlab/sidekiq_daemon/monitor_spec.rb | 261 +++++++++++++++++++++ spec/lib/gitlab/sidekiq_middleware/monitor_spec.rb | 6 +- spec/lib/gitlab/sidekiq_monitor_spec.rb | 261 --------------------- 8 files changed, 452 insertions(+), 450 deletions(-) create mode 100644 lib/gitlab/sidekiq_daemon/monitor.rb delete mode 100644 lib/gitlab/sidekiq_monitor.rb create mode 100644 spec/lib/gitlab/sidekiq_daemon/monitor_spec.rb delete mode 100644 spec/lib/gitlab/sidekiq_monitor_spec.rb diff --git a/config/initializers/sidekiq.rb b/config/initializers/sidekiq.rb index 9f3e104bc2b..20f31ff6810 100644 --- a/config/initializers/sidekiq.rb +++ b/config/initializers/sidekiq.rb @@ -60,7 +60,7 @@ Sidekiq.configure_server do |config| # Sidekiq (e.g. in an initializer). ActiveRecord::Base.clear_all_connections! - Gitlab::SidekiqMonitor.instance.start if enable_sidekiq_monitor + Gitlab::SidekiqDaemon::Monitor.instance.start if enable_sidekiq_monitor end if enable_reliable_fetch? diff --git a/doc/administration/troubleshooting/sidekiq.md b/doc/administration/troubleshooting/sidekiq.md index c41edb5dbfc..fdafac8420e 100644 --- a/doc/administration/troubleshooting/sidekiq.md +++ b/doc/administration/troubleshooting/sidekiq.md @@ -270,7 +270,7 @@ is interrupted mid-execution and it is not guaranteed that proper rollback of transactions is implemented. ```ruby -Gitlab::SidekiqMonitor.cancel_job('job-id') +Gitlab::SidekiqDaemon::Monitor.cancel_job('job-id') ``` > This requires the Sidekiq to be run with `SIDEKIQ_MONITOR_WORKER=1` diff --git a/lib/gitlab/sidekiq_daemon/monitor.rb b/lib/gitlab/sidekiq_daemon/monitor.rb new file mode 100644 index 00000000000..bbfca130425 --- /dev/null +++ b/lib/gitlab/sidekiq_daemon/monitor.rb @@ -0,0 +1,184 @@ +# frozen_string_literal: true + +module Gitlab + module SidekiqDaemon + class Monitor < Daemon + include ::Gitlab::Utils::StrongMemoize + + NOTIFICATION_CHANNEL = 'sidekiq:cancel:notifications' + CANCEL_DEADLINE = 24.hours.seconds + RECONNECT_TIME = 3.seconds + + # We use exception derived from `Exception` + # to consider this as an very low-level exception + # that should not be caught by application + CancelledError = Class.new(Exception) # rubocop:disable Lint/InheritException + + attr_reader :jobs_thread + attr_reader :jobs_mutex + + def initialize + super + + @jobs_thread = {} + @jobs_mutex = Mutex.new + end + + def within_job(jid, queue) + jobs_mutex.synchronize do + jobs_thread[jid] = Thread.current + end + + if cancelled?(jid) + Sidekiq.logger.warn( + class: self.class.to_s, + action: 'run', + queue: queue, + jid: jid, + canceled: true + ) + raise CancelledError + end + + yield + ensure + jobs_mutex.synchronize do + jobs_thread.delete(jid) + end + end + + def self.cancel_job(jid) + payload = { + action: 'cancel', + jid: jid + }.to_json + + ::Gitlab::Redis::SharedState.with do |redis| + redis.setex(cancel_job_key(jid), CANCEL_DEADLINE, 1) + redis.publish(NOTIFICATION_CHANNEL, payload) + end + end + + private + + def start_working + Sidekiq.logger.info( + class: self.class.to_s, + action: 'start', + message: 'Starting Monitor Daemon' + ) + + while enabled? + process_messages + sleep(RECONNECT_TIME) + end + + ensure + Sidekiq.logger.warn( + class: self.class.to_s, + action: 'stop', + message: 'Stopping Monitor Daemon' + ) + end + + def stop_working + thread.raise(Interrupt) if thread.alive? + end + + def process_messages + ::Gitlab::Redis::SharedState.with do |redis| + redis.subscribe(NOTIFICATION_CHANNEL) do |on| + on.message do |channel, message| + process_message(message) + end + end + end + rescue Exception => e # rubocop:disable Lint/RescueException + Sidekiq.logger.warn( + class: self.class.to_s, + action: 'exception', + message: e.message + ) + + # we re-raise system exceptions + raise e unless e.is_a?(StandardError) + end + + def process_message(message) + Sidekiq.logger.info( + class: self.class.to_s, + channel: NOTIFICATION_CHANNEL, + message: 'Received payload on channel', + payload: message + ) + + message = safe_parse(message) + return unless message + + case message['action'] + when 'cancel' + process_job_cancel(message['jid']) + else + # unknown message + end + end + + def safe_parse(message) + JSON.parse(message) + rescue JSON::ParserError + end + + def process_job_cancel(jid) + return unless jid + + # try to find thread without lock + return unless find_thread_unsafe(jid) + + Thread.new do + # try to find a thread, but with guaranteed + # that handle for thread corresponds to actually + # running job + find_thread_with_lock(jid) do |thread| + Sidekiq.logger.warn( + class: self.class.to_s, + action: 'cancel', + message: 'Canceling thread with CancelledError', + jid: jid, + thread_id: thread.object_id + ) + + thread&.raise(CancelledError) + end + end + end + + # This method needs to be thread-safe + # This is why it passes thread in block, + # to ensure that we do process this thread + def find_thread_unsafe(jid) + jobs_thread[jid] + end + + def find_thread_with_lock(jid) + # don't try to lock if we cannot find the thread + return unless find_thread_unsafe(jid) + + jobs_mutex.synchronize do + find_thread_unsafe(jid).tap do |thread| + yield(thread) if thread + end + end + end + + def cancelled?(jid) + ::Gitlab::Redis::SharedState.with do |redis| + redis.exists(self.class.cancel_job_key(jid)) + end + end + + def self.cancel_job_key(jid) + "sidekiq:cancel:#{jid}" + end + end + end +end diff --git a/lib/gitlab/sidekiq_middleware/monitor.rb b/lib/gitlab/sidekiq_middleware/monitor.rb index 53a6132edac..00965bf5506 100644 --- a/lib/gitlab/sidekiq_middleware/monitor.rb +++ b/lib/gitlab/sidekiq_middleware/monitor.rb @@ -4,10 +4,10 @@ module Gitlab module SidekiqMiddleware class Monitor def call(worker, job, queue) - Gitlab::SidekiqMonitor.instance.within_job(job['jid'], queue) do + Gitlab::SidekiqDaemon::Monitor.instance.within_job(job['jid'], queue) do yield end - rescue Gitlab::SidekiqMonitor::CancelledError + rescue Gitlab::SidekiqDaemon::Monitor::CancelledError # push job to DeadSet payload = ::Sidekiq.dump_json(job) ::Sidekiq::DeadSet.new.kill(payload, notify_failure: false) diff --git a/lib/gitlab/sidekiq_monitor.rb b/lib/gitlab/sidekiq_monitor.rb deleted file mode 100644 index a58b33534bf..00000000000 --- a/lib/gitlab/sidekiq_monitor.rb +++ /dev/null @@ -1,182 +0,0 @@ -# frozen_string_literal: true - -module Gitlab - class SidekiqMonitor < Daemon - include ::Gitlab::Utils::StrongMemoize - - NOTIFICATION_CHANNEL = 'sidekiq:cancel:notifications' - CANCEL_DEADLINE = 24.hours.seconds - RECONNECT_TIME = 3.seconds - - # We use exception derived from `Exception` - # to consider this as an very low-level exception - # that should not be caught by application - CancelledError = Class.new(Exception) # rubocop:disable Lint/InheritException - - attr_reader :jobs_thread - attr_reader :jobs_mutex - - def initialize - super - - @jobs_thread = {} - @jobs_mutex = Mutex.new - end - - def within_job(jid, queue) - jobs_mutex.synchronize do - jobs_thread[jid] = Thread.current - end - - if cancelled?(jid) - Sidekiq.logger.warn( - class: self.class.to_s, - action: 'run', - queue: queue, - jid: jid, - canceled: true - ) - raise CancelledError - end - - yield - ensure - jobs_mutex.synchronize do - jobs_thread.delete(jid) - end - end - - def self.cancel_job(jid) - payload = { - action: 'cancel', - jid: jid - }.to_json - - ::Gitlab::Redis::SharedState.with do |redis| - redis.setex(cancel_job_key(jid), CANCEL_DEADLINE, 1) - redis.publish(NOTIFICATION_CHANNEL, payload) - end - end - - private - - def start_working - Sidekiq.logger.info( - class: self.class.to_s, - action: 'start', - message: 'Starting Monitor Daemon' - ) - - while enabled? - process_messages - sleep(RECONNECT_TIME) - end - - ensure - Sidekiq.logger.warn( - class: self.class.to_s, - action: 'stop', - message: 'Stopping Monitor Daemon' - ) - end - - def stop_working - thread.raise(Interrupt) if thread.alive? - end - - def process_messages - ::Gitlab::Redis::SharedState.with do |redis| - redis.subscribe(NOTIFICATION_CHANNEL) do |on| - on.message do |channel, message| - process_message(message) - end - end - end - rescue Exception => e # rubocop:disable Lint/RescueException - Sidekiq.logger.warn( - class: self.class.to_s, - action: 'exception', - message: e.message - ) - - # we re-raise system exceptions - raise e unless e.is_a?(StandardError) - end - - def process_message(message) - Sidekiq.logger.info( - class: self.class.to_s, - channel: NOTIFICATION_CHANNEL, - message: 'Received payload on channel', - payload: message - ) - - message = safe_parse(message) - return unless message - - case message['action'] - when 'cancel' - process_job_cancel(message['jid']) - else - # unknown message - end - end - - def safe_parse(message) - JSON.parse(message) - rescue JSON::ParserError - end - - def process_job_cancel(jid) - return unless jid - - # try to find thread without lock - return unless find_thread_unsafe(jid) - - Thread.new do - # try to find a thread, but with guaranteed - # that handle for thread corresponds to actually - # running job - find_thread_with_lock(jid) do |thread| - Sidekiq.logger.warn( - class: self.class.to_s, - action: 'cancel', - message: 'Canceling thread with CancelledError', - jid: jid, - thread_id: thread.object_id - ) - - thread&.raise(CancelledError) - end - end - end - - # This method needs to be thread-safe - # This is why it passes thread in block, - # to ensure that we do process this thread - def find_thread_unsafe(jid) - jobs_thread[jid] - end - - def find_thread_with_lock(jid) - # don't try to lock if we cannot find the thread - return unless find_thread_unsafe(jid) - - jobs_mutex.synchronize do - find_thread_unsafe(jid).tap do |thread| - yield(thread) if thread - end - end - end - - def cancelled?(jid) - ::Gitlab::Redis::SharedState.with do |redis| - redis.exists(self.class.cancel_job_key(jid)) - end - end - - def self.cancel_job_key(jid) - "sidekiq:cancel:#{jid}" - end - end -end diff --git a/spec/lib/gitlab/sidekiq_daemon/monitor_spec.rb b/spec/lib/gitlab/sidekiq_daemon/monitor_spec.rb new file mode 100644 index 00000000000..acbb09e3542 --- /dev/null +++ b/spec/lib/gitlab/sidekiq_daemon/monitor_spec.rb @@ -0,0 +1,261 @@ +# frozen_string_literal: true + +require 'spec_helper' + +describe Gitlab::SidekiqDaemon::Monitor do + let(:monitor) { described_class.new } + + describe '#within_job' do + it 'tracks thread' do + blk = proc do + expect(monitor.jobs_thread['jid']).not_to be_nil + + "OK" + end + + expect(monitor.within_job('jid', 'queue', &blk)).to eq("OK") + end + + context 'when job is canceled' do + let(:jid) { SecureRandom.hex } + + before do + described_class.cancel_job(jid) + end + + it 'does not execute a block' do + expect do |blk| + monitor.within_job(jid, 'queue', &blk) + rescue described_class::CancelledError + end.not_to yield_control + end + + it 'raises exception' do + expect { monitor.within_job(jid, 'queue') }.to raise_error( + described_class::CancelledError) + end + end + end + + describe '#start_working' do + subject { monitor.send(:start_working) } + + before do + # we want to run at most once cycle + # we toggle `enabled?` flag after the first call + stub_const('Gitlab::SidekiqDaemon::Monitor::RECONNECT_TIME', 0) + allow(monitor).to receive(:enabled?).and_return(true, false) + + allow(Sidekiq.logger).to receive(:info) + allow(Sidekiq.logger).to receive(:warn) + end + + context 'when structured logging is used' do + it 'logs start message' do + expect(Sidekiq.logger).to receive(:info) + .with( + class: described_class.to_s, + action: 'start', + message: 'Starting Monitor Daemon') + + expect(::Gitlab::Redis::SharedState).to receive(:with) + + subject + end + + it 'logs stop message' do + expect(Sidekiq.logger).to receive(:warn) + .with( + class: described_class.to_s, + action: 'stop', + message: 'Stopping Monitor Daemon') + + expect(::Gitlab::Redis::SharedState).to receive(:with) + + subject + end + + it 'logs StandardError message' do + expect(Sidekiq.logger).to receive(:warn) + .with( + class: described_class.to_s, + action: 'exception', + message: 'My Exception') + + expect(::Gitlab::Redis::SharedState).to receive(:with) + .and_raise(StandardError, 'My Exception') + + expect { subject }.not_to raise_error + end + + it 'logs and raises Exception message' do + expect(Sidekiq.logger).to receive(:warn) + .with( + class: described_class.to_s, + action: 'exception', + message: 'My Exception') + + expect(::Gitlab::Redis::SharedState).to receive(:with) + .and_raise(Exception, 'My Exception') + + expect { subject }.to raise_error(Exception, 'My Exception') + end + end + + context 'when StandardError is raised' do + it 'does retry connection' do + expect(::Gitlab::Redis::SharedState).to receive(:with) + .and_raise(StandardError, 'My Exception') + + expect(::Gitlab::Redis::SharedState).to receive(:with) + + # we expect to run `process_messages` twice + expect(monitor).to receive(:enabled?).and_return(true, true, false) + + subject + end + end + + context 'when message is published' do + let(:subscribed) { double } + + before do + expect_any_instance_of(::Redis).to receive(:subscribe) + .and_yield(subscribed) + + expect(subscribed).to receive(:message) + .and_yield( + described_class::NOTIFICATION_CHANNEL, + payload + ) + + expect(Sidekiq.logger).to receive(:info) + .with( + class: described_class.to_s, + action: 'start', + message: 'Starting Monitor Daemon') + + expect(Sidekiq.logger).to receive(:info) + .with( + class: described_class.to_s, + channel: described_class::NOTIFICATION_CHANNEL, + message: 'Received payload on channel', + payload: payload + ) + end + + context 'and message is valid' do + let(:payload) { '{"action":"cancel","jid":"my-jid"}' } + + it 'processes cancel' do + expect(monitor).to receive(:process_job_cancel).with('my-jid') + + subject + end + end + + context 'and message is not valid json' do + let(:payload) { '{"action"}' } + + it 'skips processing' do + expect(monitor).not_to receive(:process_job_cancel) + + subject + end + end + end + end + + describe '#stop' do + let!(:monitor_thread) { monitor.start } + + it 'does stop the thread' do + expect(monitor_thread).to be_alive + + expect { monitor.stop }.not_to raise_error + + expect(monitor_thread).not_to be_alive + expect { monitor_thread.value }.to raise_error(Interrupt) + end + end + + describe '#process_job_cancel' do + subject { monitor.send(:process_job_cancel, jid) } + + context 'when jid is missing' do + let(:jid) { nil } + + it 'does not run thread' do + expect(subject).to be_nil + end + end + + context 'when jid is provided' do + let(:jid) { 'my-jid' } + + context 'when jid is not found' do + it 'does not log cancellation message' do + expect(Sidekiq.logger).not_to receive(:warn) + expect(subject).to be_nil + end + end + + context 'when jid is found' do + let(:thread) { Thread.new { sleep 1000 } } + + before do + monitor.jobs_thread[jid] = thread + end + + after do + thread.kill + rescue + end + + it 'does log cancellation message' do + expect(Sidekiq.logger).to receive(:warn) + .with( + class: described_class.to_s, + action: 'cancel', + message: 'Canceling thread with CancelledError', + jid: 'my-jid', + thread_id: thread.object_id) + + expect(subject).to be_a(Thread) + + subject.join + end + + it 'does cancel the thread' do + expect(subject).to be_a(Thread) + + subject.join + + # we wait for the thread to be cancelled + # by `process_job_cancel` + expect { thread.join(5) }.to raise_error(described_class::CancelledError) + end + end + end + end + + describe '.cancel_job' do + subject { described_class.cancel_job('my-jid') } + + it 'sets a redis key' do + expect_any_instance_of(::Redis).to receive(:setex) + .with('sidekiq:cancel:my-jid', anything, 1) + + subject + end + + it 'notifies all workers' do + payload = '{"action":"cancel","jid":"my-jid"}' + + expect_any_instance_of(::Redis).to receive(:publish) + .with('sidekiq:cancel:notifications', payload) + + subject + end + end +end diff --git a/spec/lib/gitlab/sidekiq_middleware/monitor_spec.rb b/spec/lib/gitlab/sidekiq_middleware/monitor_spec.rb index 7319cdc2399..023df1a6391 100644 --- a/spec/lib/gitlab/sidekiq_middleware/monitor_spec.rb +++ b/spec/lib/gitlab/sidekiq_middleware/monitor_spec.rb @@ -10,8 +10,8 @@ describe Gitlab::SidekiqMiddleware::Monitor do let(:job) { { 'jid' => 'job-id' } } let(:queue) { 'my-queue' } - it 'calls SidekiqMonitor' do - expect(Gitlab::SidekiqMonitor.instance).to receive(:within_job) + it 'calls Gitlab::SidekiqDaemon::Monitor' do + expect(Gitlab::SidekiqDaemon::Monitor.instance).to receive(:within_job) .with('job-id', 'my-queue') .and_call_original @@ -29,7 +29,7 @@ describe Gitlab::SidekiqMiddleware::Monitor do context 'when cancel happens' do subject do monitor.call(worker, job, queue) do - raise Gitlab::SidekiqMonitor::CancelledError + raise Gitlab::SidekiqDaemon::Monitor::CancelledError end end diff --git a/spec/lib/gitlab/sidekiq_monitor_spec.rb b/spec/lib/gitlab/sidekiq_monitor_spec.rb deleted file mode 100644 index bbd7bf90217..00000000000 --- a/spec/lib/gitlab/sidekiq_monitor_spec.rb +++ /dev/null @@ -1,261 +0,0 @@ -# frozen_string_literal: true - -require 'spec_helper' - -describe Gitlab::SidekiqMonitor do - let(:monitor) { described_class.new } - - describe '#within_job' do - it 'tracks thread' do - blk = proc do - expect(monitor.jobs_thread['jid']).not_to be_nil - - "OK" - end - - expect(monitor.within_job('jid', 'queue', &blk)).to eq("OK") - end - - context 'when job is canceled' do - let(:jid) { SecureRandom.hex } - - before do - described_class.cancel_job(jid) - end - - it 'does not execute a block' do - expect do |blk| - monitor.within_job(jid, 'queue', &blk) - rescue described_class::CancelledError - end.not_to yield_control - end - - it 'raises exception' do - expect { monitor.within_job(jid, 'queue') }.to raise_error( - described_class::CancelledError) - end - end - end - - describe '#start_working' do - subject { monitor.send(:start_working) } - - before do - # we want to run at most once cycle - # we toggle `enabled?` flag after the first call - stub_const('Gitlab::SidekiqMonitor::RECONNECT_TIME', 0) - allow(monitor).to receive(:enabled?).and_return(true, false) - - allow(Sidekiq.logger).to receive(:info) - allow(Sidekiq.logger).to receive(:warn) - end - - context 'when structured logging is used' do - it 'logs start message' do - expect(Sidekiq.logger).to receive(:info) - .with( - class: described_class.to_s, - action: 'start', - message: 'Starting Monitor Daemon') - - expect(::Gitlab::Redis::SharedState).to receive(:with) - - subject - end - - it 'logs stop message' do - expect(Sidekiq.logger).to receive(:warn) - .with( - class: described_class.to_s, - action: 'stop', - message: 'Stopping Monitor Daemon') - - expect(::Gitlab::Redis::SharedState).to receive(:with) - - subject - end - - it 'logs StandardError message' do - expect(Sidekiq.logger).to receive(:warn) - .with( - class: described_class.to_s, - action: 'exception', - message: 'My Exception') - - expect(::Gitlab::Redis::SharedState).to receive(:with) - .and_raise(StandardError, 'My Exception') - - expect { subject }.not_to raise_error - end - - it 'logs and raises Exception message' do - expect(Sidekiq.logger).to receive(:warn) - .with( - class: described_class.to_s, - action: 'exception', - message: 'My Exception') - - expect(::Gitlab::Redis::SharedState).to receive(:with) - .and_raise(Exception, 'My Exception') - - expect { subject }.to raise_error(Exception, 'My Exception') - end - end - - context 'when StandardError is raised' do - it 'does retry connection' do - expect(::Gitlab::Redis::SharedState).to receive(:with) - .and_raise(StandardError, 'My Exception') - - expect(::Gitlab::Redis::SharedState).to receive(:with) - - # we expect to run `process_messages` twice - expect(monitor).to receive(:enabled?).and_return(true, true, false) - - subject - end - end - - context 'when message is published' do - let(:subscribed) { double } - - before do - expect_any_instance_of(::Redis).to receive(:subscribe) - .and_yield(subscribed) - - expect(subscribed).to receive(:message) - .and_yield( - described_class::NOTIFICATION_CHANNEL, - payload - ) - - expect(Sidekiq.logger).to receive(:info) - .with( - class: described_class.to_s, - action: 'start', - message: 'Starting Monitor Daemon') - - expect(Sidekiq.logger).to receive(:info) - .with( - class: described_class.to_s, - channel: described_class::NOTIFICATION_CHANNEL, - message: 'Received payload on channel', - payload: payload - ) - end - - context 'and message is valid' do - let(:payload) { '{"action":"cancel","jid":"my-jid"}' } - - it 'processes cancel' do - expect(monitor).to receive(:process_job_cancel).with('my-jid') - - subject - end - end - - context 'and message is not valid json' do - let(:payload) { '{"action"}' } - - it 'skips processing' do - expect(monitor).not_to receive(:process_job_cancel) - - subject - end - end - end - end - - describe '#stop' do - let!(:monitor_thread) { monitor.start } - - it 'does stop the thread' do - expect(monitor_thread).to be_alive - - expect { monitor.stop }.not_to raise_error - - expect(monitor_thread).not_to be_alive - expect { monitor_thread.value }.to raise_error(Interrupt) - end - end - - describe '#process_job_cancel' do - subject { monitor.send(:process_job_cancel, jid) } - - context 'when jid is missing' do - let(:jid) { nil } - - it 'does not run thread' do - expect(subject).to be_nil - end - end - - context 'when jid is provided' do - let(:jid) { 'my-jid' } - - context 'when jid is not found' do - it 'does not log cancellation message' do - expect(Sidekiq.logger).not_to receive(:warn) - expect(subject).to be_nil - end - end - - context 'when jid is found' do - let(:thread) { Thread.new { sleep 1000 } } - - before do - monitor.jobs_thread[jid] = thread - end - - after do - thread.kill - rescue - end - - it 'does log cancellation message' do - expect(Sidekiq.logger).to receive(:warn) - .with( - class: described_class.to_s, - action: 'cancel', - message: 'Canceling thread with CancelledError', - jid: 'my-jid', - thread_id: thread.object_id) - - expect(subject).to be_a(Thread) - - subject.join - end - - it 'does cancel the thread' do - expect(subject).to be_a(Thread) - - subject.join - - # we wait for the thread to be cancelled - # by `process_job_cancel` - expect { thread.join(5) }.to raise_error(described_class::CancelledError) - end - end - end - end - - describe '.cancel_job' do - subject { described_class.cancel_job('my-jid') } - - it 'sets a redis key' do - expect_any_instance_of(::Redis).to receive(:setex) - .with('sidekiq:cancel:my-jid', anything, 1) - - subject - end - - it 'notifies all workers' do - payload = '{"action":"cancel","jid":"my-jid"}' - - expect_any_instance_of(::Redis).to receive(:publish) - .with('sidekiq:cancel:notifications', payload) - - subject - end - end -end -- cgit v1.2.1