summaryrefslogtreecommitdiff
path: root/lib/gitlab/background_migration/job_coordinator.rb
blob: c440db58b9424538a3832d48f3455d445a52793e (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
# frozen_string_literal: true

module Gitlab
  module BackgroundMigration
    # Class responsible for executing background migrations based on the given database.
    #
    # Chooses the correct worker class when selecting jobs from the queue based on the
    # convention of how the queues and worker classes are setup for each database.
    #
    # Also provides a database connection to the correct tracking database.
    class JobCoordinator # rubocop:disable Metrics/ClassLength
      class << self
        def for_tracking_database(tracking_database)
          worker_class = worker_for_tracking_database[tracking_database]

          if worker_class.nil?
            raise ArgumentError, "The '#{tracking_database}' must be one of #{worker_for_tracking_database.keys.to_a}"
          end

          new(worker_class)
        end

        private

        def worker_classes
          @worker_classes ||= [
            ::BackgroundMigrationWorker,
            ::BackgroundMigration::CiDatabaseWorker
          ].freeze
        end

        def worker_for_tracking_database
          @worker_for_tracking_database ||= worker_classes
            .select { |worker_class| Gitlab::Database.has_config?(worker_class.tracking_database) }
            .index_by(&:tracking_database)
            .with_indifferent_access
            .freeze
        end
      end

      attr_reader :worker_class

      delegate :minimum_interval, :perform_in, to: :worker_class

      def queue
        @queue ||= worker_class.sidekiq_options['queue']
      end

      def with_shared_connection(&block)
        Gitlab::Database::SharedModel.using_connection(connection, &block)
      end

      def pending_jobs(include_dead_jobs: false)
        Enumerator.new do |y|
          queues = [
            Sidekiq::ScheduledSet.new,
            Sidekiq::Queue.new(self.queue)
          ]

          if include_dead_jobs
            queues << Sidekiq::RetrySet.new
            queues << Sidekiq::DeadSet.new
          end

          queues.each do |queue|
            queue.each do |job|
              y << job if job.klass == worker_class.name
            end
          end
        end
      end

      def steal(steal_class, retry_dead_jobs: false)
        with_shared_connection do
          pending_jobs(include_dead_jobs: retry_dead_jobs).each do |job|
            migration_class, migration_args = job.args

            next unless migration_class == steal_class
            next if block_given? && !(yield job)

            begin
              perform(migration_class, migration_args) if job.delete
            rescue Exception # rubocop:disable Lint/RescueException
              worker_class # enqueue this migration again
                .perform_async(migration_class, migration_args)

              raise
            end
          end
        end
      end

      def perform(class_name, arguments)
        with_shared_connection do
          migration_instance_for(class_name).perform(*arguments)
        end
      end

      def remaining
        enqueued = Sidekiq::Queue.new(self.queue)
        scheduled = Sidekiq::ScheduledSet.new

        [enqueued, scheduled].sum do |set|
          set.count do |job|
            job.klass == worker_class.name
          end
        end
      end

      def exists?(migration_class, additional_queues = [])
        enqueued = Sidekiq::Queue.new(self.queue)
        scheduled = Sidekiq::ScheduledSet.new

        enqueued_job?([enqueued, scheduled], migration_class)
      end

      def dead_jobs?(migration_class)
        dead_set = Sidekiq::DeadSet.new

        enqueued_job?([dead_set], migration_class)
      end

      def retrying_jobs?(migration_class)
        retry_set = Sidekiq::RetrySet.new

        enqueued_job?([retry_set], migration_class)
      end

      def migration_instance_for(class_name)
        migration_class = migration_class_for(class_name)

        if migration_class < Gitlab::BackgroundMigration::BaseJob
          migration_class.new(connection: connection)
        else
          migration_class.new
        end
      end

      def migration_class_for(class_name)
        Gitlab::BackgroundMigration.const_get(class_name, false)
      end

      def enqueued_job?(queues, migration_class)
        queues.any? do |queue|
          queue.any? do |job|
            job.klass == worker_class.name && job.args.first == migration_class
          end
        end
      end

      private

      def initialize(worker_class)
        @worker_class = worker_class
      end

      def connection
        @connection ||= Gitlab::Database
          .database_base_models
          .fetch(worker_class.tracking_database)
          .connection
      end
    end
  end
end