blob: 6324971fe8fa3d6f89805c223cefdba022921490 (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
|
# frozen_string_literal: true
require 'sidekiq'
require 'sidekiq/cli'
require_relative 'config'
def spawn_workers_and_stop_them_on_a_half_way
pids = spawn_workers
wait_until do |queue_size|
queue_size < NUMBER_OF_JOBS / 2
end
first_half_pids, second_half_pids = split_array(pids)
puts 'Killing half of the workers...'
signal_to_workers('KILL', first_half_pids)
puts 'Stopping another half of the workers...'
signal_to_workers('TERM', second_half_pids)
end
def spawn_workers_and_let_them_finish
puts 'Spawn workers and let them finish...'
pids = spawn_workers
wait_until do |queue_size|
queue_size.zero?
end
if %i[semi reliable].include? JOB_FETCHER
puts 'Waiting for clean up process that will requeue dead jobs...'
sleep WAIT_CLEANUP
end
signal_to_workers('TERM', pids)
end
def wait_until
loop do
sleep 3
queue_size = current_queue_size
puts "Jobs in the queue:#{queue_size}"
break if yield(queue_size)
end
end
def signal_to_workers(signal, pids)
pids.each { |pid| Process.kill(signal, pid) }
pids.each { |pid| Process.wait(pid) }
end
def spawn_workers
pids = []
NUMBER_OF_WORKERS.times do
pids << spawn('sidekiq -q default -q low -q high -r ./config.rb')
end
pids
end
def current_queue_size
Sidekiq.redis { |c| c.llen('queue:default') }
end
def duplicates
Sidekiq.redis { |c| c.llen(REDIS_FINISHED_LIST) }
end
# Splits array into two halves
def split_array(arr)
first_arr = arr.take(arr.size / 2)
second_arr = arr - first_arr
[first_arr, second_arr]
end
##########################################################
puts '########################################'
puts "Mode: #{JOB_FETCHER}"
puts '########################################'
Sidekiq.redis(&:flushdb)
jobs = []
NUMBER_OF_JOBS.times do
jobs << ReliabilityTestWorker.perform_async
end
puts "Queued #{NUMBER_OF_JOBS} jobs"
spawn_workers_and_stop_them_on_a_half_way
spawn_workers_and_let_them_finish
jobs_lost = 0
Sidekiq.redis do |redis|
jobs.each do |job|
next if redis.lrem(REDIS_FINISHED_LIST, 1, job) == 1
jobs_lost += 1
end
end
puts "Remaining unprocessed: #{jobs_lost}"
puts "Duplicates found: #{duplicates}"
if jobs_lost.zero? && duplicates.zero?
exit 0
else
exit 1
end
|