summaryrefslogtreecommitdiff
path: root/lib/gitlab/background_migration/batched_migration_job.rb
blob: 4039a79cfa7acfb9d7263a765ee916b8889bd3f8 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
# frozen_string_literal: true

module Gitlab
  module BackgroundMigration
    # Base class for batched background migrations. Subclasses should implement the `#perform`
    # method as the entry point for the job's execution.
    #
    # Job arguments needed must be defined explicitly,
    # see https://docs.gitlab.com/ee/development/database/batched_background_migrations.html#job-arguments.
    class BatchedMigrationJob
      include Gitlab::Database::DynamicModelHelpers
      include Gitlab::ClassAttributes

      DEFAULT_FEATURE_CATEGORY = :database

      class << self
        def generic_instance(batch_table:, batch_column:, job_arguments: [], connection:)
          new(
            batch_table: batch_table, batch_column: batch_column,
            job_arguments: job_arguments, connection: connection,
            start_id: 0, end_id: 0, sub_batch_size: 0, pause_ms: 0
          )
        end

        def job_arguments_count
          0
        end

        def operation_name(operation)
          define_method(:operation_name) do
            operation
          end
        end

        def job_arguments(*args)
          args.each.with_index do |arg, index|
            define_method(arg) do
              @job_arguments[index]
            end
          end

          define_singleton_method(:job_arguments_count) do
            args.count
          end
        end

        def scope_to(scope)
          define_method(:filter_batch) do |relation|
            instance_exec(relation, &scope)
          end
        end

        def feature_category(feature_category_name = nil)
          if feature_category_name.present?
            set_class_attribute(:feature_category, feature_category_name)
          else
            get_class_attribute(:feature_category) || DEFAULT_FEATURE_CATEGORY
          end
        end
      end

      def initialize(
        start_id:, end_id:, batch_table:, batch_column:, sub_batch_size:, pause_ms:, job_arguments: [], connection:
      )

        @start_id = start_id
        @end_id = end_id
        @batch_table = batch_table
        @batch_column = batch_column
        @sub_batch_size = sub_batch_size
        @pause_ms = pause_ms
        @job_arguments = job_arguments
        @connection = connection
      end

      def filter_batch(relation)
        relation
      end

      def perform
        raise NotImplementedError, "subclasses of #{self.class.name} must implement #{__method__}"
      end

      def batch_metrics
        @batch_metrics ||= Gitlab::Database::BackgroundMigration::BatchMetrics.new
      end

      private

      attr_reader :start_id, :end_id, :batch_table, :batch_column, :sub_batch_size, :pause_ms, :connection

      def each_sub_batch(batching_arguments: {}, batching_scope: nil)
        all_batching_arguments = { column: batch_column, of: sub_batch_size }.merge(batching_arguments)

        relation = filter_batch(base_relation)
        sub_batch_relation = filter_sub_batch(relation, batching_scope)

        sub_batch_relation.each_batch(**all_batching_arguments) do |relation|
          batch_metrics.instrument_operation(operation_name) do
            yield relation
          end

          sleep([pause_ms, 0].max * 0.001)
        end
      end

      def distinct_each_batch(batching_arguments: {})
        if base_relation != filter_batch(base_relation)
          raise 'distinct_each_batch can not be used when additional filters are defined with scope_to'
        end

        all_batching_arguments = { column: batch_column, of: sub_batch_size }.merge(batching_arguments)

        base_relation.distinct_each_batch(**all_batching_arguments) do |relation|
          batch_metrics.instrument_operation(operation_name) do
            yield relation
          end

          sleep([pause_ms, 0].max * 0.001)
        end
      end

      def base_relation
        define_batchable_model(batch_table, connection: connection)
          .where(batch_column => start_id..end_id)
      end

      def filter_sub_batch(relation, batching_scope = nil)
        return relation unless batching_scope

        batching_scope.call(relation)
      end

      def operation_name
        raise('Operation name is required, please define it with `operation_name`')
      end
    end
  end
end