diff options
Diffstat (limited to 'vendor/gems/sidekiq-reliable-fetch/spec/fetch_shared_examples.rb')
-rw-r--r-- | vendor/gems/sidekiq-reliable-fetch/spec/fetch_shared_examples.rb | 195 |
1 files changed, 195 insertions, 0 deletions
diff --git a/vendor/gems/sidekiq-reliable-fetch/spec/fetch_shared_examples.rb b/vendor/gems/sidekiq-reliable-fetch/spec/fetch_shared_examples.rb new file mode 100644 index 00000000000..df7f715f2f9 --- /dev/null +++ b/vendor/gems/sidekiq-reliable-fetch/spec/fetch_shared_examples.rb @@ -0,0 +1,195 @@ +shared_examples 'a Sidekiq fetcher' do + let(:queues) { ['assigned'] } + + before { Sidekiq.redis(&:flushdb) } + + describe '#retrieve_work' do + let(:job) { Sidekiq.dump_json(class: 'Bob', args: [1, 2, 'foo']) } + let(:fetcher) { described_class.new(queues: queues) } + + it 'does not clean up orphaned jobs more than once per cleanup interval' do + Sidekiq.redis = Sidekiq::RedisConnection.create(url: REDIS_URL, size: 10) + + expect(fetcher).to receive(:clean_working_queues!).once + + threads = 10.times.map do + Thread.new do + fetcher.retrieve_work + end + end + + threads.map(&:join) + end + + it 'retrieves by order when strictly order is enabled' do + fetcher = described_class.new(strict: true, queues: ['first', 'second']) + + Sidekiq.redis do |conn| + conn.rpush('queue:first', ['msg3', 'msg2', 'msg1']) + conn.rpush('queue:second', 'msg4') + end + + jobs = (1..4).map { fetcher.retrieve_work.job } + + expect(jobs).to eq ['msg1', 'msg2', 'msg3', 'msg4'] + end + + it 'does not starve any queue when queues are not strictly ordered' do + fetcher = described_class.new(queues: ['first', 'second']) + + Sidekiq.redis do |conn| + conn.rpush('queue:first', (1..200).map { |i| "msg#{i}" }) + conn.rpush('queue:second', 'this_job_should_not_stuck') + end + + jobs = (1..100).map { fetcher.retrieve_work.job } + + expect(jobs).to include 'this_job_should_not_stuck' + end + + shared_examples "basic queue handling" do |queue| + let (:fetcher) { described_class.new(queues: [queue]) } + + it 'retrieves the job and puts it to working queue' do + Sidekiq.redis { |conn| conn.rpush("queue:#{queue}", job) } + + uow = fetcher.retrieve_work + + expect(working_queue_size(queue)).to eq 1 + expect(uow.queue_name).to eq queue + expect(uow.job).to eq job + expect(Sidekiq::Queue.new(queue).size).to eq 0 + end + + it 'does not retrieve a job from foreign queue' do + Sidekiq.redis { |conn| conn.rpush("'queue:#{queue}:not", job) } + expect(fetcher.retrieve_work).to be_nil + + Sidekiq.redis { |conn| conn.rpush("'queue:not_#{queue}", job) } + expect(fetcher.retrieve_work).to be_nil + + Sidekiq.redis { |conn| conn.rpush("'queue:random_name", job) } + expect(fetcher.retrieve_work).to be_nil + end + + it 'requeues jobs from legacy dead working queue with incremented interrupted_count' do + Sidekiq.redis do |conn| + conn.rpush(legacy_other_process_working_queue_name(queue), job) + end + + expected_job = Sidekiq.load_json(job) + expected_job['interrupted_count'] = 1 + expected_job = Sidekiq.dump_json(expected_job) + + uow = fetcher.retrieve_work + + expect(uow).to_not be_nil + expect(uow.job).to eq expected_job + + Sidekiq.redis do |conn| + expect(conn.llen(legacy_other_process_working_queue_name(queue))).to eq 0 + end + end + + it 'ignores working queue keys in unknown formats' do + # Add a spurious non-numeric char segment at the end; this simulates any other + # incorrect form in general + malformed_key = "#{other_process_working_queue_name(queue)}:X" + Sidekiq.redis do |conn| + conn.rpush(malformed_key, job) + end + + uow = fetcher.retrieve_work + + Sidekiq.redis do |conn| + expect(conn.llen(malformed_key)).to eq 1 + end + end + + it 'requeues jobs from dead working queue with incremented interrupted_count' do + Sidekiq.redis do |conn| + conn.rpush(other_process_working_queue_name(queue), job) + end + + expected_job = Sidekiq.load_json(job) + expected_job['interrupted_count'] = 1 + expected_job = Sidekiq.dump_json(expected_job) + + uow = fetcher.retrieve_work + + expect(uow).to_not be_nil + expect(uow.job).to eq expected_job + + Sidekiq.redis do |conn| + expect(conn.llen(other_process_working_queue_name(queue))).to eq 0 + end + end + + it 'does not requeue jobs from live working queue' do + working_queue = live_other_process_working_queue_name(queue) + + Sidekiq.redis do |conn| + conn.rpush(working_queue, job) + end + + uow = fetcher.retrieve_work + + expect(uow).to be_nil + + Sidekiq.redis do |conn| + expect(conn.llen(working_queue)).to eq 1 + end + end + end + + context 'with various queues' do + %w[assigned namespace:assigned namespace:deeper:assigned].each do |queue| + it_behaves_like "basic queue handling", queue + end + end + + context 'with short cleanup interval' do + let(:short_interval) { 1 } + let(:fetcher) { described_class.new(queues: queues, lease_interval: short_interval, cleanup_interval: short_interval) } + + it 'requeues when there is no heartbeat' do + Sidekiq.redis { |conn| conn.rpush('queue:assigned', job) } + # Use of retrieve_work twice with a sleep ensures we have exercised the + # `identity` method to create the working queue key name and that it + # matches the patterns used in the cleanup + uow = fetcher.retrieve_work + sleep(short_interval + 1) + uow = fetcher.retrieve_work + + # Will only receive a UnitOfWork if the job was detected as failed and requeued + expect(uow).to_not be_nil + end + end + end +end + +def working_queue_size(queue_name) + Sidekiq.redis do |c| + c.llen(Sidekiq::BaseReliableFetch.working_queue_name("queue:#{queue_name}")) + end +end + +def legacy_other_process_working_queue_name(queue) + "#{Sidekiq::BaseReliableFetch::WORKING_QUEUE_PREFIX}:queue:#{queue}:#{Socket.gethostname}:#{::Process.pid + 1}" +end + +def other_process_working_queue_name(queue) + "#{Sidekiq::BaseReliableFetch::WORKING_QUEUE_PREFIX}:queue:#{queue}:#{Socket.gethostname}:#{::Process.pid + 1}:#{::SecureRandom.hex(6)}" +end + +def live_other_process_working_queue_name(queue) + pid = ::Process.pid + 1 + hostname = Socket.gethostname + nonce = SecureRandom.hex(6) + + Sidekiq.redis do |conn| + conn.set(Sidekiq::BaseReliableFetch.heartbeat_key("#{hostname}-#{pid}-#{nonce}"), 1) + end + + "#{Sidekiq::BaseReliableFetch::WORKING_QUEUE_PREFIX}:queue:#{queue}:#{hostname}:#{pid}:#{nonce}" +end |