summaryrefslogtreecommitdiff
path: root/lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb
blob: c6fb50b46109dc9d6dbcf9972769dda16df7e869 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
# frozen_string_literal: true

require 'digest'

module Gitlab
  module SidekiqMiddleware
    module DuplicateJobs
      # This class defines an identifier of a job in a queue
      # The identifier based on a job's class and arguments.
      #
      # As strategy decides when to keep track of the job in redis and when to
      # remove it.
      #
      # Storing the deduplication key in redis can be done by calling `check!`
      # check returns the `jid` of the job if it was scheduled, or the `jid` of
      # the duplicate job if it was already scheduled
      #
      # When new jobs can be scheduled again, the strategy calls `#delete`.
      class DuplicateJob
        DUPLICATE_KEY_TTL = 6.hours

        attr_reader :existing_jid

        def initialize(job, queue_name, strategy: :until_executing)
          @job = job
          @queue_name = queue_name
          @strategy = strategy
        end

        # This will continue the middleware chain if the job should be scheduled
        # It will return false if the job needs to be cancelled
        def schedule(&block)
          Strategies.for(strategy).new(self).schedule(job, &block)
        end

        # This will continue the server middleware chain if the job should be
        # executed.
        # It will return false if the job should not be executed.
        def perform(&block)
          Strategies.for(strategy).new(self).perform(job, &block)
        end

        # This method will return the jid that was set in redis
        def check!
          read_jid = nil

          Sidekiq.redis do |redis|
            redis.multi do |multi|
              redis.set(idempotency_key, jid, ex: DUPLICATE_KEY_TTL, nx: true)
              read_jid = redis.get(idempotency_key)
            end
          end

          self.existing_jid = read_jid.value
        end

        def delete!
          Sidekiq.redis do |redis|
            redis.del(idempotency_key)
          end
        end

        def duplicate?
          raise "Call `#check!` first to check for existing duplicates" unless existing_jid

          jid != existing_jid
        end

        def droppable?
          idempotent? && duplicate? && DuplicateJobs.drop_duplicates?
        end

        private

        attr_reader :queue_name, :strategy, :job
        attr_writer :existing_jid

        def worker_class_name
          job['class']
        end

        def arguments
          job['args']
        end

        def jid
          job['jid']
        end

        def idempotency_key
          @idempotency_key ||= "#{namespace}:#{idempotency_hash}"
        end

        def idempotency_hash
          Digest::SHA256.hexdigest(idempotency_string)
        end

        def namespace
          "#{Gitlab::Redis::Queues::SIDEKIQ_NAMESPACE}:duplicate:#{queue_name}"
        end

        def idempotency_string
          "#{worker_class_name}:#{arguments.join('-')}"
        end

        def idempotent?
          worker_class = worker_class_name.to_s.safe_constantize
          return false unless worker_class
          return false unless worker_class.respond_to?(:idempotent?)

          worker_class.idempotent?
        end
      end
    end
  end
end