summaryrefslogtreecommitdiff
path: root/vendor/gems/sidekiq-reliable-fetch/spec/base_reliable_fetch_spec.rb
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/gems/sidekiq-reliable-fetch/spec/base_reliable_fetch_spec.rb')
-rw-r--r--vendor/gems/sidekiq-reliable-fetch/spec/base_reliable_fetch_spec.rb97
1 files changed, 97 insertions, 0 deletions
diff --git a/vendor/gems/sidekiq-reliable-fetch/spec/base_reliable_fetch_spec.rb b/vendor/gems/sidekiq-reliable-fetch/spec/base_reliable_fetch_spec.rb
new file mode 100644
index 00000000000..cdc4409f0d5
--- /dev/null
+++ b/vendor/gems/sidekiq-reliable-fetch/spec/base_reliable_fetch_spec.rb
@@ -0,0 +1,97 @@
+require 'spec_helper'
+require 'fetch_shared_examples'
+require 'sidekiq/base_reliable_fetch'
+require 'sidekiq/reliable_fetch'
+require 'sidekiq/semi_reliable_fetch'
+
+describe Sidekiq::BaseReliableFetch do
+ let(:job) { Sidekiq.dump_json(class: 'Bob', args: [1, 2, 'foo']) }
+
+ before { Sidekiq.redis(&:flushdb) }
+
+ describe 'UnitOfWork' do
+ let(:fetcher) { Sidekiq::ReliableFetch.new(queues: ['foo']) }
+
+ describe '#requeue' do
+ it 'requeues job' do
+ Sidekiq.redis { |conn| conn.rpush('queue:foo', job) }
+
+ uow = fetcher.retrieve_work
+
+ uow.requeue
+
+ expect(Sidekiq::Queue.new('foo').size).to eq 1
+ expect(working_queue_size('foo')).to eq 0
+ end
+ end
+
+ describe '#acknowledge' do
+ it 'acknowledges job' do
+ Sidekiq.redis { |conn| conn.rpush('queue:foo', job) }
+
+ uow = fetcher.retrieve_work
+
+ expect { uow.acknowledge }
+ .to change { working_queue_size('foo') }.by(-1)
+
+ expect(Sidekiq::Queue.new('foo').size).to eq 0
+ end
+ end
+ end
+
+ describe '#bulk_requeue' do
+ let(:options) { { queues: %w[foo bar] } }
+ let!(:queue1) { Sidekiq::Queue.new('foo') }
+ let!(:queue2) { Sidekiq::Queue.new('bar') }
+
+ it 'requeues the bulk' do
+ uow = described_class::UnitOfWork
+ jobs = [ uow.new('queue:foo', job), uow.new('queue:foo', job), uow.new('queue:bar', job) ]
+ described_class.new(options).bulk_requeue(jobs, nil)
+
+ expect(queue1.size).to eq 2
+ expect(queue2.size).to eq 1
+ end
+
+ it 'puts jobs into interrupted queue' do
+ uow = described_class::UnitOfWork
+ interrupted_job = Sidekiq.dump_json(class: 'Bob', args: [1, 2, 'foo'], interrupted_count: 3)
+ jobs = [ uow.new('queue:foo', interrupted_job), uow.new('queue:foo', job), uow.new('queue:bar', job) ]
+ described_class.new(options).bulk_requeue(jobs, nil)
+
+ expect(queue1.size).to eq 1
+ expect(queue2.size).to eq 1
+ expect(Sidekiq::InterruptedSet.new.size).to eq 1
+ end
+
+ it 'does not put jobs into interrupted queue if it is disabled' do
+ options[:max_retries_after_interruption] = -1
+
+ uow = described_class::UnitOfWork
+ interrupted_job = Sidekiq.dump_json(class: 'Bob', args: [1, 2, 'foo'], interrupted_count: 3)
+ jobs = [ uow.new('queue:foo', interrupted_job), uow.new('queue:foo', job), uow.new('queue:bar', job) ]
+ described_class.new(options).bulk_requeue(jobs, nil)
+
+ expect(queue1.size).to eq 2
+ expect(queue2.size).to eq 1
+ expect(Sidekiq::InterruptedSet.new.size).to eq 0
+ end
+ end
+
+ it 'sets heartbeat' do
+ config = double(:sidekiq_config, options: { queues: %w[foo bar] })
+
+ heartbeat_thread = described_class.setup_reliable_fetch!(config)
+
+ Sidekiq.redis do |conn|
+ sleep 0.2 # Give the time to heartbeat thread to make a loop
+
+ heartbeat_key = described_class.heartbeat_key(described_class.identity)
+ heartbeat = conn.get(heartbeat_key)
+
+ expect(heartbeat).not_to be_nil
+ end
+
+ heartbeat_thread.kill
+ end
+end