1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
|
# frozen_string_literal: true
require 'yaml'
module Backup
class Repositories
def initialize(progress, strategy:, max_concurrency: 1, max_storage_concurrency: 1)
@progress = progress
@strategy = strategy
@max_concurrency = max_concurrency
@max_storage_concurrency = max_storage_concurrency
end
def dump
strategy.start(:create)
# gitaly-backup is designed to handle concurrency on its own. So we want
# to avoid entering the buggy concurrency code here when gitaly-backup
# is enabled.
if (max_concurrency <= 1 && max_storage_concurrency <= 1) || !strategy.parallel_enqueue?
return enqueue_consecutive
end
if max_concurrency < 1 || max_storage_concurrency < 1
puts "GITLAB_BACKUP_MAX_CONCURRENCY and GITLAB_BACKUP_MAX_STORAGE_CONCURRENCY must have a value of at least 1".color(:red)
exit 1
end
check_valid_storages!
semaphore = Concurrent::Semaphore.new(max_concurrency)
errors = Queue.new
threads = Gitlab.config.repositories.storages.keys.map do |storage|
Thread.new do
Rails.application.executor.wrap do
enqueue_storage(storage, semaphore, max_storage_concurrency: max_storage_concurrency)
rescue StandardError => e
errors << e
end
end
end
ActiveSupport::Dependencies.interlock.permit_concurrent_loads do
threads.each(&:join)
end
raise errors.pop unless errors.empty?
ensure
strategy.finish!
end
def restore
strategy.start(:restore)
enqueue_consecutive
ensure
strategy.finish!
cleanup_snippets_without_repositories
restore_object_pools
end
def enabled
true
end
def human_name
_('repositories')
end
private
attr_reader :progress, :strategy, :max_concurrency, :max_storage_concurrency
def check_valid_storages!
repository_storage_klasses.each do |klass|
if klass.excluding_repository_storage(Gitlab.config.repositories.storages.keys).exists?
raise Error, "repositories.storages in gitlab.yml does not include all storages used by #{klass}"
end
end
end
def repository_storage_klasses
[ProjectRepository, SnippetRepository]
end
def enqueue_consecutive
enqueue_consecutive_projects
enqueue_consecutive_snippets
end
def enqueue_consecutive_projects
project_relation.find_each(batch_size: 1000) do |project|
enqueue_project(project)
end
end
def enqueue_consecutive_snippets
Snippet.find_each(batch_size: 1000) { |snippet| enqueue_snippet(snippet) }
end
def enqueue_storage(storage, semaphore, max_storage_concurrency:)
errors = Queue.new
queue = InterlockSizedQueue.new(1)
threads = Array.new(max_storage_concurrency) do
Thread.new do
Rails.application.executor.wrap do
while container = queue.pop
ActiveSupport::Dependencies.interlock.permit_concurrent_loads do
semaphore.acquire
end
begin
enqueue_container(container)
rescue StandardError => e
errors << e
break
ensure
semaphore.release
end
end
end
end
end
enqueue_records_for_storage(storage, queue, errors)
raise errors.pop unless errors.empty?
ensure
queue.close
ActiveSupport::Dependencies.interlock.permit_concurrent_loads do
threads.each(&:join)
end
end
def enqueue_container(container)
case container
when Project
enqueue_project(container)
when Snippet
enqueue_snippet(container)
end
end
def enqueue_project(project)
strategy.enqueue(project, Gitlab::GlRepository::PROJECT)
strategy.enqueue(project, Gitlab::GlRepository::WIKI)
strategy.enqueue(project, Gitlab::GlRepository::DESIGN)
end
def enqueue_snippet(snippet)
strategy.enqueue(snippet, Gitlab::GlRepository::SNIPPET)
end
def enqueue_records_for_storage(storage, queue, errors)
records_to_enqueue(storage).each do |relation|
relation.find_each(batch_size: 100) do |project|
break unless errors.empty?
queue.push(project)
end
end
end
def records_to_enqueue(storage)
[projects_in_storage(storage), snippets_in_storage(storage)]
end
def projects_in_storage(storage)
project_relation.id_in(ProjectRepository.for_repository_storage(storage).select(:project_id))
end
def project_relation
Project.includes(:route, :group, namespace: :owner)
end
def snippets_in_storage(storage)
Snippet.id_in(SnippetRepository.for_repository_storage(storage).select(:snippet_id))
end
def restore_object_pools
PoolRepository.includes(:source_project).find_each do |pool|
progress.puts " - Object pool #{pool.disk_path}..."
pool.source_project ||= pool.member_projects.first&.root_of_fork_network
unless pool.source_project
progress.puts " - Object pool #{pool.disk_path}... " + "[SKIPPED]".color(:cyan)
next
end
pool.state = 'none'
pool.save
pool.schedule
end
end
# Snippets without a repository should be removed because they failed to import
# due to having invalid repositories
def cleanup_snippets_without_repositories
invalid_snippets = []
Snippet.find_each(batch_size: 1000).each do |snippet|
response = Snippets::RepositoryValidationService.new(nil, snippet).execute
next if response.success?
snippet.repository.remove
progress.puts("Snippet #{snippet.full_path} can't be restored: #{response.message}")
invalid_snippets << snippet.id
end
Snippet.id_in(invalid_snippets).delete_all
end
class InterlockSizedQueue < SizedQueue
extend ::Gitlab::Utils::Override
override :pop
def pop(*)
ActiveSupport::Dependencies.interlock.permit_concurrent_loads do
super
end
end
override :push
def push(*)
ActiveSupport::Dependencies.interlock.permit_concurrent_loads do
super
end
end
end
end
end
Backup::Repositories.prepend_mod_with('Backup::Repositories')
|