blob: 5b8a601dde1e9790994516b32560049f210fba35 (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
|
# frozen_string_literal: true
module Sidekiq
class SemiReliableFetch < BaseReliableFetch
# We want the fetch operation to timeout every few seconds so the thread
# can check if the process is shutting down. This constant is only used
# for semi-reliable fetch.
DEFAULT_SEMI_RELIABLE_FETCH_TIMEOUT = 2 # seconds
def initialize(options)
super
if strictly_ordered_queues
@queues = @queues.uniq
@queues << { timeout: semi_reliable_fetch_timeout }
end
end
private
def retrieve_unit_of_work
work = Sidekiq.redis { |conn| conn.brpop(*queues_cmd) }
return unless work
unit_of_work = UnitOfWork.new(*work)
Sidekiq.redis do |conn|
conn.lpush(self.class.working_queue_name(unit_of_work.queue), unit_of_work.job)
end
unit_of_work
end
def queues_cmd
if strictly_ordered_queues
@queues
else
queues = @queues.shuffle.uniq
queues << { timeout: semi_reliable_fetch_timeout }
queues
end
end
def semi_reliable_fetch_timeout
@semi_reliable_fetch_timeout ||= ENV['SIDEKIQ_SEMI_RELIABLE_FETCH_TIMEOUT']&.to_i || DEFAULT_SEMI_RELIABLE_FETCH_TIMEOUT
end
end
end
|