summaryrefslogtreecommitdiff
path: root/vendor/gems/sidekiq-reliable-fetch/lib/sidekiq/semi_reliable_fetch.rb
blob: 5b8a601dde1e9790994516b32560049f210fba35 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# frozen_string_literal: true

module Sidekiq
  class SemiReliableFetch < BaseReliableFetch
    # We want the fetch operation to timeout every few seconds so the thread
    # can check if the process is shutting down. This constant is only used
    # for semi-reliable fetch.
    DEFAULT_SEMI_RELIABLE_FETCH_TIMEOUT = 2 # seconds

    def initialize(options)
      super

      if strictly_ordered_queues
        @queues = @queues.uniq
        @queues << { timeout: semi_reliable_fetch_timeout }
      end
    end

    private

    def retrieve_unit_of_work
      work = Sidekiq.redis { |conn| conn.brpop(*queues_cmd) }
      return unless work

      unit_of_work = UnitOfWork.new(*work)

      Sidekiq.redis do |conn|
        conn.lpush(self.class.working_queue_name(unit_of_work.queue), unit_of_work.job)
      end

      unit_of_work
    end

    def queues_cmd
      if strictly_ordered_queues
        @queues
      else
        queues = @queues.shuffle.uniq
        queues << { timeout: semi_reliable_fetch_timeout }
        queues
      end
    end

    def semi_reliable_fetch_timeout
      @semi_reliable_fetch_timeout ||= ENV['SIDEKIQ_SEMI_RELIABLE_FETCH_TIMEOUT']&.to_i || DEFAULT_SEMI_RELIABLE_FETCH_TIMEOUT
    end
  end
end