1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
|
# frozen_string_literal: true
module Gitlab
module BackgroundMigration
# Class responsible for executing background migrations based on the given database.
#
# Chooses the correct worker class when selecting jobs from the queue based on the
# convention of how the queues and worker classes are setup for each database.
#
# Also provides a database connection to the correct tracking database.
class JobCoordinator # rubocop:disable Metrics/ClassLength
class << self
def worker_classes
@worker_classes ||= [
BackgroundMigrationWorker
].freeze
end
def worker_for_tracking_database
@worker_for_tracking_database ||= worker_classes
.index_by(&:tracking_database)
.with_indifferent_access
.freeze
end
def for_tracking_database(tracking_database)
worker_class = worker_for_tracking_database[tracking_database]
if worker_class.nil?
raise ArgumentError, "tracking_database must be one of [#{worker_for_tracking_database.keys.join(', ')}]"
end
new(worker_class)
end
end
attr_reader :worker_class
delegate :minimum_interval, :perform_in, to: :worker_class
def queue
@queue ||= worker_class.sidekiq_options['queue']
end
def with_shared_connection(&block)
Gitlab::Database::SharedModel.using_connection(connection, &block)
end
def steal(steal_class, retry_dead_jobs: false)
with_shared_connection do
queues = [
Sidekiq::ScheduledSet.new,
Sidekiq::Queue.new(self.queue)
]
if retry_dead_jobs
queues << Sidekiq::RetrySet.new
queues << Sidekiq::DeadSet.new
end
queues.each do |queue|
queue.each do |job|
migration_class, migration_args = job.args
next unless job.klass == worker_class.name
next unless migration_class == steal_class
next if block_given? && !(yield job)
begin
perform(migration_class, migration_args) if job.delete
rescue Exception # rubocop:disable Lint/RescueException
worker_class # enqueue this migration again
.perform_async(migration_class, migration_args)
raise
end
end
end
end
end
def perform(class_name, arguments)
with_shared_connection do
migration_instance_for(class_name).perform(*arguments)
end
end
def remaining
enqueued = Sidekiq::Queue.new(self.queue)
scheduled = Sidekiq::ScheduledSet.new
[enqueued, scheduled].sum do |set|
set.count do |job|
job.klass == worker_class.name
end
end
end
def exists?(migration_class, additional_queues = [])
enqueued = Sidekiq::Queue.new(self.queue)
scheduled = Sidekiq::ScheduledSet.new
enqueued_job?([enqueued, scheduled], migration_class)
end
def dead_jobs?(migration_class)
dead_set = Sidekiq::DeadSet.new
enqueued_job?([dead_set], migration_class)
end
def retrying_jobs?(migration_class)
retry_set = Sidekiq::RetrySet.new
enqueued_job?([retry_set], migration_class)
end
def migration_instance_for(class_name)
migration_class = migration_class_for(class_name)
if migration_class < Gitlab::BackgroundMigration::BaseJob
migration_class.new(connection: connection)
else
migration_class.new
end
end
def migration_class_for(class_name)
Gitlab::BackgroundMigration.const_get(class_name, false)
end
def enqueued_job?(queues, migration_class)
queues.any? do |queue|
queue.any? do |job|
job.klass == worker_class.name && job.args.first == migration_class
end
end
end
private
def initialize(worker_class)
@worker_class = worker_class
end
def connection
@connection ||= Gitlab::Database
.database_base_models
.fetch(worker_class.tracking_database, Gitlab::Database::PRIMARY_DATABASE_NAME)
.connection
end
end
end
end
|