summaryrefslogtreecommitdiff
path: root/app/workers/concerns/application_worker.rb
blob: 222d045b0badb3f601e179be8d1a12a255425420 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
# frozen_string_literal: true

require 'sidekiq/api'

Sidekiq::Worker.extend ActiveSupport::Concern # rubocop:disable Cop/SidekiqApiUsage

module ApplicationWorker
  extend ActiveSupport::Concern

  include Sidekiq::Worker # rubocop:disable Cop/IncludeSidekiqWorker
  include WorkerAttributes
  include WorkerContext
  include Gitlab::SidekiqVersioning::Worker

  LOGGING_EXTRA_KEY = 'extra'
  SAFE_PUSH_BULK_LIMIT = 1000

  included do
    prefer_calling_context_feature_category false
    set_queue
    after_set_class_attribute { set_queue }

    def structured_payload(payload = {})
      context = Gitlab::ApplicationContext.current.merge(
        'class' => self.class.name,
        'job_status' => 'running',
        'queue' => self.class.queue,
        'jid' => jid
      )

      payload.stringify_keys.merge(context)
    end

    def log_extra_metadata_on_done(key, value)
      @done_log_extra_metadata ||= {}
      @done_log_extra_metadata[key] = value
    end

    def logging_extras
      return {} unless @done_log_extra_metadata

      # Prefix keys with class name to avoid conflicts in Elasticsearch types.
      # Also prefix with "extra." so that we know to log these new fields.
      @done_log_extra_metadata.transform_keys do |k|
        "#{LOGGING_EXTRA_KEY}.#{self.class.name.gsub("::", "_").underscore}.#{k}"
      end
    end
  end

  class_methods do
    extend ::Gitlab::Utils::Override

    def inherited(subclass)
      subclass.set_queue
      subclass.after_set_class_attribute { subclass.set_queue }
    end

    def with_status
      status_from_class = self.sidekiq_options_hash['status_expiration']

      set(status_expiration: status_from_class || Gitlab::SidekiqStatus::DEFAULT_EXPIRATION)
    end

    def generated_queue_name
      Gitlab::SidekiqConfig::WorkerRouter.queue_name_from_worker_name(self)
    end

    def validate_worker_attributes!
      # Since the delayed data_consistency will use sidekiq built in retry mechanism, it is required that this mechanism
      # is not disabled.
      if retry_disabled? && get_data_consistency == :delayed
        raise ArgumentError, "Retry support cannot be disabled if data_consistency is set to :delayed"
      end
    end

    # Checks if sidekiq retry support is disabled
    def retry_disabled?
      get_sidekiq_options['retry'] == 0 || get_sidekiq_options['retry'] == false
    end

    override :sidekiq_options
    def sidekiq_options(opts = {})
      super.tap do
        validate_worker_attributes!
      end
    end

    override :data_consistency
    def data_consistency(data_consistency, feature_flag: nil)
      super

      validate_worker_attributes!
    end

    def set_queue
      queue_name = ::Gitlab::SidekiqConfig::WorkerRouter.global.route(self)
      sidekiq_options queue: queue_name # rubocop:disable Cop/SidekiqOptionsQueue
    end

    def queue_namespace(new_namespace = nil)
      if new_namespace
        sidekiq_options queue_namespace: new_namespace

        set_queue
      else
        get_sidekiq_options['queue_namespace']&.to_s
      end
    end

    def queue
      get_sidekiq_options['queue'].to_s
    end

    # Set/get which arguments can be logged and sent to Sentry.
    #
    # Numeric arguments are logged by default, so there is no need to
    # list those.
    #
    # Non-numeric arguments must be listed by position, as Sidekiq
    # cannot see argument names.
    #
    def loggable_arguments(*args)
      if args.any?
        @loggable_arguments = args
      else
        @loggable_arguments || []
      end
    end

    def log_bulk_perform_async?
      @log_bulk_perform_async
    end

    def log_bulk_perform_async!
      @log_bulk_perform_async = true
    end

    def bulk_perform_async(args_list)
      if log_bulk_perform_async?
        Sidekiq.logger.info('class' => self.name, 'args_list' => args_list, 'args_list_count' => args_list.length, 'message' => 'Inserting multiple jobs')
      end

      do_push_bulk(args_list).tap do |job_ids|
        if log_bulk_perform_async?
          Sidekiq.logger.info('class' => self.name, 'jid_list' => job_ids, 'jid_list_count' => job_ids.length, 'message' => 'Completed JID insertion')
        end
      end
    end

    def bulk_perform_in(delay, args_list, batch_size: nil, batch_delay: nil)
      now = Time.now.to_i
      base_schedule_at = now + delay.to_i

      if base_schedule_at <= now
        raise ArgumentError, 'The schedule time must be in the future!'
      end

      schedule_at = base_schedule_at

      if batch_size && batch_delay
        batch_size = batch_size.to_i
        batch_delay = batch_delay.to_i

        raise ArgumentError, 'batch_size should be greater than 0' unless batch_size > 0
        raise ArgumentError, 'batch_delay should be greater than 0' unless batch_delay > 0

        # build an array of schedules corresponding to each item in `args_list`
        bulk_schedule_at = Array.new(args_list.size) do |index|
          batch_number = index / batch_size
          base_schedule_at + (batch_number * batch_delay)
        end

        schedule_at = bulk_schedule_at
      end

      in_safe_limit_batches(args_list, schedule_at) do |args_batch, schedule_at_for_batch|
        Sidekiq::Client.push_bulk('class' => self, 'args' => args_batch, 'at' => schedule_at_for_batch) # rubocop:disable Cop/SidekiqApiUsage
      end
    end

    private

    def do_push_bulk(args_list)
      in_safe_limit_batches(args_list) do |args_batch, _|
        Sidekiq::Client.push_bulk('class' => self, 'args' => args_batch) # rubocop:disable Cop/SidekiqApiUsage
      end
    end

    def in_safe_limit_batches(args_list, schedule_at = nil, safe_limit = SAFE_PUSH_BULK_LIMIT)
      # `schedule_at` could be one of
      # - nil.
      # - a single Numeric that represents time, like `30.minutes.from_now.to_i`.
      # - an array, where each element is a Numeric that reprsents time.
      #    - Each element in this array would correspond to the time at which
      #    - the job in `args_list` at the corresponding index needs to be scheduled.

      # In the case where `schedule_at` is an array of Numeric, it needs to be sliced
      # in the same manner as the `args_list`, with each slice containing `safe_limit`
      # number of elements.
      schedule_at = schedule_at.each_slice(safe_limit).to_a if schedule_at.is_a?(Array)

      args_list.each_slice(safe_limit).with_index.flat_map do |args_batch, index|
        schedule_at_for_batch = process_schedule_at_for_batch(schedule_at, index)

        yield(args_batch, schedule_at_for_batch)
      end
    end

    def process_schedule_at_for_batch(schedule_at, index)
      return unless schedule_at
      return schedule_at[index] if schedule_at.is_a?(Array) && schedule_at.all?(Array)

      schedule_at
    end
  end
end