1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
|
# frozen_string_literal: true
module Gitlab
module BackgroundMigration
# Base class for batched background migrations. Subclasses should implement the `#perform`
# method as the entry point for the job's execution.
#
# Job arguments needed must be defined explicitly,
# see https://docs.gitlab.com/ee/development/database/batched_background_migrations.html#job-arguments.
class BatchedMigrationJob
include Gitlab::Database::DynamicModelHelpers
include Gitlab::ClassAttributes
DEFAULT_FEATURE_CATEGORY = :database
class << self
def generic_instance(batch_table:, batch_column:, job_arguments: [], connection:)
new(
batch_table: batch_table, batch_column: batch_column,
job_arguments: job_arguments, connection: connection,
start_id: 0, end_id: 0, sub_batch_size: 0, pause_ms: 0
)
end
def job_arguments_count
0
end
def operation_name(operation)
define_method(:operation_name) do
operation
end
end
def job_arguments(*args)
args.each.with_index do |arg, index|
define_method(arg) do
@job_arguments[index]
end
end
define_singleton_method(:job_arguments_count) do
args.count
end
end
def scope_to(scope)
define_method(:filter_batch) do |relation|
instance_exec(relation, &scope)
end
end
def feature_category(feature_category_name = nil)
if feature_category_name.present?
set_class_attribute(:feature_category, feature_category_name)
else
get_class_attribute(:feature_category) || DEFAULT_FEATURE_CATEGORY
end
end
end
def initialize(
start_id:, end_id:, batch_table:, batch_column:, sub_batch_size:, pause_ms:, job_arguments: [], connection:
)
@start_id = start_id
@end_id = end_id
@batch_table = batch_table
@batch_column = batch_column
@sub_batch_size = sub_batch_size
@pause_ms = pause_ms
@job_arguments = job_arguments
@connection = connection
end
def filter_batch(relation)
relation
end
def perform
raise NotImplementedError, "subclasses of #{self.class.name} must implement #{__method__}"
end
def batch_metrics
@batch_metrics ||= Gitlab::Database::BackgroundMigration::BatchMetrics.new
end
private
attr_reader :start_id, :end_id, :batch_table, :batch_column, :sub_batch_size, :pause_ms, :connection
def each_sub_batch(batching_arguments: {}, batching_scope: nil)
all_batching_arguments = { column: batch_column, of: sub_batch_size }.merge(batching_arguments)
relation = filter_batch(base_relation)
sub_batch_relation = filter_sub_batch(relation, batching_scope)
sub_batch_relation.each_batch(**all_batching_arguments) do |relation|
batch_metrics.instrument_operation(operation_name) do
yield relation
end
sleep([pause_ms, 0].max * 0.001)
end
end
def distinct_each_batch(batching_arguments: {})
if base_relation != filter_batch(base_relation)
raise 'distinct_each_batch can not be used when additional filters are defined with scope_to'
end
all_batching_arguments = { column: batch_column, of: sub_batch_size }.merge(batching_arguments)
base_relation.distinct_each_batch(**all_batching_arguments) do |relation|
batch_metrics.instrument_operation(operation_name) do
yield relation
end
sleep([pause_ms, 0].max * 0.001)
end
end
def base_relation
define_batchable_model(batch_table, connection: connection)
.where(batch_column => start_id..end_id)
end
def filter_sub_batch(relation, batching_scope = nil)
return relation unless batching_scope
batching_scope.call(relation)
end
def operation_name
raise('Operation name is required, please define it with `operation_name`')
end
end
end
end
|