summaryrefslogtreecommitdiff
path: root/vendor/gems/sidekiq-reliable-fetch/tests/reliability/reliability_test.rb
blob: 6324971fe8fa3d6f89805c223cefdba022921490 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
# frozen_string_literal: true

require 'sidekiq'
require 'sidekiq/cli'
require_relative 'config'

def spawn_workers_and_stop_them_on_a_half_way
  pids = spawn_workers

  wait_until do |queue_size|
    queue_size < NUMBER_OF_JOBS / 2
  end

  first_half_pids, second_half_pids = split_array(pids)

  puts 'Killing half of the workers...'
  signal_to_workers('KILL', first_half_pids)

  puts 'Stopping another half of the workers...'
  signal_to_workers('TERM', second_half_pids)
end

def spawn_workers_and_let_them_finish
  puts 'Spawn workers and let them finish...'

  pids = spawn_workers

  wait_until do |queue_size|
    queue_size.zero?
  end

  if %i[semi reliable].include? JOB_FETCHER
    puts 'Waiting for clean up process that will requeue dead jobs...'
    sleep WAIT_CLEANUP
  end

  signal_to_workers('TERM', pids)
end

def wait_until
  loop do
    sleep 3

    queue_size = current_queue_size
    puts "Jobs in the queue:#{queue_size}"

    break if yield(queue_size)
  end
end

def signal_to_workers(signal, pids)
  pids.each { |pid| Process.kill(signal, pid) }
  pids.each { |pid| Process.wait(pid) }
end

def spawn_workers
  pids = []
  NUMBER_OF_WORKERS.times do
    pids << spawn('sidekiq -q default -q low -q high -r ./config.rb')
  end

  pids
end

def current_queue_size
  Sidekiq.redis { |c| c.llen('queue:default') }
end

def duplicates
  Sidekiq.redis { |c| c.llen(REDIS_FINISHED_LIST) }
end

# Splits array into two halves
def split_array(arr)
  first_arr = arr.take(arr.size / 2)
  second_arr = arr - first_arr
  [first_arr, second_arr]
end

##########################################################

puts '########################################'
puts "Mode: #{JOB_FETCHER}"
puts '########################################'

Sidekiq.redis(&:flushdb)

jobs = []

NUMBER_OF_JOBS.times do
  jobs << ReliabilityTestWorker.perform_async
end

puts "Queued #{NUMBER_OF_JOBS} jobs"

spawn_workers_and_stop_them_on_a_half_way
spawn_workers_and_let_them_finish

jobs_lost = 0

Sidekiq.redis do |redis|
  jobs.each do |job|
    next if redis.lrem(REDIS_FINISHED_LIST, 1, job) == 1
    jobs_lost += 1
  end
end

puts "Remaining unprocessed: #{jobs_lost}"
puts "Duplicates found: #{duplicates}"

if jobs_lost.zero? && duplicates.zero?
  exit 0
else
  exit 1
end