1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
|
# frozen_string_literal: true
module BackgroundMigration
module SingleDatabaseWorker
extend ActiveSupport::Concern
include ApplicationWorker
MAX_LEASE_ATTEMPTS = 5
included do
data_consistency :always
sidekiq_options retry: 3
feature_category :database
urgency :throttled
loggable_arguments 0, 1
end
class_methods do
# The minimum amount of time between processing two jobs of the same migration
# class.
#
# This interval is set to 2 or 5 minutes so autovacuuming and other
# maintenance related tasks have plenty of time to clean up after a migration
# has been performed.
def minimum_interval
2.minutes.to_i
end
def tracking_database
raise NotImplementedError, "#{self.name} does not implement #{__method__}"
end
end
# Performs the background migration.
#
# See Gitlab::BackgroundMigration.perform for more information.
#
# class_name - The class name of the background migration to run.
# arguments - The arguments to pass to the migration class.
# lease_attempts - The number of times we will try to obtain an exclusive
# lease on the class before giving up. See MR for more discussion.
# https://gitlab.com/gitlab-org/gitlab/-/merge_requests/45298#note_434304956
def perform(class_name, arguments = [], lease_attempts = MAX_LEASE_ATTEMPTS)
job_coordinator.with_shared_connection do
perform_with_connection(class_name, arguments, lease_attempts)
end
end
private
def tracking_database
self.class.tracking_database
end
def job_coordinator
@job_coordinator ||= Gitlab::BackgroundMigration.coordinator_for_database(tracking_database)
end
def perform_with_connection(class_name, arguments, lease_attempts)
with_context(caller_id: class_name.to_s) do
retried = lease_attempts != MAX_LEASE_ATTEMPTS
attempts_left = lease_attempts - 1
should_perform, ttl = perform_and_ttl(class_name, attempts_left, retried)
break if should_perform.nil?
if should_perform
job_coordinator.perform(class_name, arguments)
else
# If the lease could not be obtained this means either another process is
# running a migration of this class or we ran one recently. In this case
# we'll reschedule the job in such a way that it is picked up again around
# the time the lease expires.
self.class
.perform_in(ttl || self.class.minimum_interval, class_name, arguments, attempts_left)
end
end
end
def perform_and_ttl(class_name, attempts_left, retried)
# In test environments `perform_in` will run right away. This can then
# lead to stack level errors in the above `#perform`. To work around this
# we'll just perform the migration right away in the test environment.
return [true, nil] if always_perform?
lease = lease_for(class_name, retried)
lease_obtained = !!lease.try_obtain
healthy_db = healthy_database?
perform = lease_obtained && healthy_db
database_unhealthy_counter.increment(db_config_name: tracking_database) if lease_obtained && !healthy_db
# When the DB is unhealthy or the lease can't be obtained after several tries,
# then give up on the job and log a warning. Otherwise we could end up in
# an infinite rescheduling loop. Jobs can be tracked in the database with the
# use of Gitlab::Database::BackgroundMigrationJob
if !perform && attempts_left < 0
msg = if !lease_obtained
'Job could not get an exclusive lease after several tries. Giving up.'
else
'Database was unhealthy after several tries. Giving up.'
end
Sidekiq.logger.warn(class: class_name, message: msg, job_id: jid)
return [nil, nil]
end
[perform, lease.ttl]
end
def lease_for(class_name, retried)
Gitlab::ExclusiveLease
.new(lease_key_for(class_name, retried), timeout: self.class.minimum_interval)
end
def lease_key_for(class_name, retried)
key = "#{self.class.name}:#{class_name}"
# We use a different exclusive lock key for retried jobs to allow them running concurrently with the scheduled jobs.
# See https://gitlab.com/gitlab-org/gitlab/-/merge_requests/68763 for more information.
key += ":retried" if retried
key
end
def always_perform?
Rails.env.test?
end
# Returns true if the database is healthy enough to allow the migration to be
# performed.
#
# class_name - The name of the background migration that we might want to
# run.
def healthy_database?
!Postgresql::ReplicationSlot.lag_too_great?
end
def database_unhealthy_counter
Gitlab::Metrics.counter(
:background_migration_database_health_reschedules,
'The number of times a background migration is rescheduled because the database is unhealthy.'
)
end
end
end
|