1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
|
# frozen_string_literal: true
module ContainerRegistry
module Migration
class EnqueuerWorker
include ApplicationWorker
include CronjobQueue # rubocop:disable Scalability/CronWorkerContext
include Gitlab::Utils::StrongMemoize
include ExclusiveLeaseGuard
DEFAULT_LEASE_TIMEOUT = 30.minutes.to_i.freeze
data_consistency :always
feature_category :container_registry
urgency :low
deduplicate :until_executing, ttl: DEFAULT_LEASE_TIMEOUT
idempotent!
def perform
try_obtain_lease do
while runnable? && Time.zone.now < loop_deadline
repository_handled = handle_aborted_migration || handle_next_migration
# no repository was found: stop the loop
break unless repository_handled
# we're going for another iteration so we need to clear memoization
clear_memoization(:next_repository)
clear_memoization(:next_aborted_repository)
clear_memoization(:last_step_completed_repository)
end
end
end
def self.enqueue_a_job
perform_async
end
private
def handle_aborted_migration
return unless next_aborted_repository
next_aborted_repository.retry_aborted_migration
true
rescue StandardError => e
Gitlab::ErrorTracking.log_exception(e, next_aborted_repository_id: next_aborted_repository&.id)
false
ensure
log_repository_info(next_aborted_repository, import_type: 'retry')
end
def handle_next_migration
return unless next_repository
# We return true because the repository was successfully processed (migration_state is changed)
return true if tag_count_too_high?
return unless next_repository.start_pre_import
true
rescue StandardError => e
Gitlab::ErrorTracking.log_exception(e, next_repository_id: next_repository&.id)
next_repository&.abort_import
false
ensure
log_repository_info(next_repository, import_type: 'next')
end
def tag_count_too_high?
return false if migration.max_tags_count == 0
return false unless next_repository.tags_count > migration.max_tags_count
next_repository.skip_import(reason: :too_many_tags)
true
end
def below_capacity?
current_capacity < maximum_capacity
end
def waiting_time_passed?
delay = migration.enqueue_waiting_time
return true if delay == 0
return true unless last_step_completed_repository&.last_import_step_done_at
last_step_completed_repository.last_import_step_done_at < Time.zone.now - delay
end
def runnable?
unless migration.enabled?
log_extra_metadata_on_done(:migration_enabled, false)
return false
end
unless below_capacity?
log_extra_metadata_on_done(:max_capacity_setting, maximum_capacity)
log_extra_metadata_on_done(:below_capacity, false)
return false
end
unless waiting_time_passed?
log_extra_metadata_on_done(:waiting_time_passed, false)
log_extra_metadata_on_done(:current_waiting_time_setting, migration.enqueue_waiting_time)
return false
end
true
end
def current_capacity
ContainerRepository.with_migration_states(
%w[pre_importing pre_import_done importing]
).count
end
def maximum_capacity
migration.capacity
end
def next_repository
strong_memoize(:next_repository) do
# Using .limit(25)[0] instead of take here. Using a LIMIT 1 and 2 caused the query planner to
# use an inefficient sequential scan instead of picking an index. LIMIT 25 works around
# this issue.
# See https://gitlab.com/gitlab-org/gitlab/-/merge_requests/87733 and
# https://gitlab.com/gitlab-org/gitlab/-/merge_requests/90735 for details.
ContainerRepository.ready_for_import.ordered.limit(25)[0] # rubocop:disable CodeReuse/ActiveRecord
end
end
def next_aborted_repository
strong_memoize(:next_aborted_repository) do
ContainerRepository.with_migration_state('import_aborted').ordered.limit(25)[0] # rubocop:disable CodeReuse/ActiveRecord
end
end
def last_step_completed_repository
strong_memoize(:last_step_completed_repository) do
ContainerRepository.recently_done_migration_step.first
end
end
def migration
::ContainerRegistry::Migration
end
def re_enqueue_if_capacity
return unless below_capacity?
self.class.enqueue_a_job
end
def log_info(extras)
logger.info(structured_payload(extras))
end
def log_repository_info(repository, extras = {})
return unless repository
repository_info = {
container_repository_id: repository.id,
container_repository_path: repository.path,
container_repository_migration_state: repository.migration_state
}
if repository.import_skipped?
repository_info[:container_repository_migration_skipped_reason] = repository.migration_skipped_reason
end
log_info(extras.merge(repository_info))
end
def loop_deadline
strong_memoize(:loop_deadline) do
250.seconds.from_now
end
end
# used by ExclusiveLeaseGuard
def lease_key
'container_registry:migration:enqueuer_worker'
end
# used by ExclusiveLeaseGuard
def lease_timeout
DEFAULT_LEASE_TIMEOUT
end
end
end
end
|