1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
|
# frozen_string_literal: true
module Gitlab
module Database
module BackgroundMigration
class BatchedMigrationRunner
FailedToFinalize = Class.new(RuntimeError)
def self.finalize(job_class_name, table_name, column_name, job_arguments, connection:)
new(connection: connection).finalize(job_class_name, table_name, column_name, job_arguments)
end
def initialize(connection:, migration_wrapper: BatchedMigrationWrapper.new(connection: connection))
@connection = connection
@migration_wrapper = migration_wrapper
end
# Runs the next batched_job for a batched_background_migration.
#
# The batch bounds of the next job are calculated at runtime, based on the migration
# configuration and the bounds of the most recently created batched_job. Updating the
# migration configuration will cause future jobs to use the updated batch sizes.
#
# The job instance will automatically receive a set of arguments based on the migration
# configuration. For more details, see the BatchedMigrationWrapper class.
#
# Note that this method is primarily intended to called by a scheduled worker.
def run_migration_job(active_migration)
if next_batched_job = find_or_create_next_batched_job(active_migration)
migration_wrapper.perform(next_batched_job)
adjust_migration(active_migration)
active_migration.failure! if next_batched_job.failed? && active_migration.should_stop?
else
finish_active_migration(active_migration)
end
end
# Runs all remaining batched_jobs for a batched_background_migration.
#
# This method is intended to be used in a test/dev environment to execute the background
# migration inline. It should NOT be used in a real environment for any non-trivial migrations.
def run_entire_migration(migration)
unless Rails.env.development? || Rails.env.test?
raise 'this method is not intended for use in real environments'
end
run_migration_while(migration, :active)
end
# Finalize migration for given configuration.
#
# If the migration is already finished, do nothing. Otherwise change its status to `finalizing`
# in order to prevent it being picked up by the background worker. Perform all pending jobs,
# then keep running until migration is finished.
def finalize(job_class_name, table_name, column_name, job_arguments)
migration = BatchedMigration.find_for_configuration(
Gitlab::Database.gitlab_schemas_for_connection(connection),
job_class_name, table_name, column_name, job_arguments
)
configuration = {
job_class_name: job_class_name,
table_name: table_name,
column_name: column_name,
job_arguments: job_arguments
}
if migration.nil?
Gitlab::AppLogger.warn "Could not find batched background migration for the given configuration: #{configuration}"
elsif migration.finished?
Gitlab::AppLogger.warn "Batched background migration for the given configuration is already finished: #{configuration}"
else
migration.reset_attempts_of_blocked_jobs!
migration.finalize!
migration.batched_jobs.with_status(:pending).each { |job| migration_wrapper.perform(job) }
run_migration_while(migration, :finalizing)
error_message = "Batched migration #{migration.job_class_name} could not be completed and a manual action is required."\
"Check the admin panel at (`/admin/background_migrations`) for more details."
raise FailedToFinalize, error_message unless migration.finished?
end
end
private
attr_reader :connection, :migration_wrapper
def find_or_create_next_batched_job(active_migration)
if next_batch_range = find_next_batch_range(active_migration)
active_migration.create_batched_job!(next_batch_range.min, next_batch_range.max)
else
active_migration.batched_jobs.retriable.first
end
end
def find_next_batch_range(active_migration)
batching_strategy = active_migration.batch_class.new(connection: connection)
batch_min_value = active_migration.next_min_value
next_batch_bounds = batching_strategy.next_batch(
active_migration.table_name,
active_migration.column_name,
batch_min_value: batch_min_value,
batch_size: active_migration.batch_size,
job_arguments: active_migration.job_arguments,
job_class: active_migration.job_class)
return if next_batch_bounds.nil?
clamped_batch_range(active_migration, next_batch_bounds)
end
def clamped_batch_range(active_migration, next_bounds)
min_value, max_value = next_bounds
return if min_value > active_migration.max_value
max_value = max_value.clamp(min_value, active_migration.max_value)
(min_value..max_value)
end
def finish_active_migration(active_migration)
return if active_migration.batched_jobs.active.exists?
if active_migration.batched_jobs.with_status(:failed).exists?
active_migration.failure!
else
active_migration.finish!
end
end
def run_migration_while(migration, status)
while migration.status_name == status
run_migration_job(migration)
migration.reload_last_job
end
end
def adjust_migration(active_migration)
signal = HealthStatus.evaluate(active_migration)
if signal.is_a?(HealthStatus::Signals::Stop)
active_migration.hold!
else
active_migration.optimize!
end
end
end
end
end
end
|