summaryrefslogtreecommitdiff
path: root/lib/gitlab/database/background_migration/batched_migration_runner.rb
blob: 59ff9a9744f192ab1c2ad6acc15cf582fb1fa251 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
# frozen_string_literal: true

module Gitlab
  module Database
    module BackgroundMigration
      class BatchedMigrationRunner
        FailedToFinalize = Class.new(RuntimeError)

        def self.finalize(job_class_name, table_name, column_name, job_arguments, connection:)
          new(connection: connection).finalize(job_class_name, table_name, column_name, job_arguments)
        end

        def initialize(connection:, migration_wrapper: BatchedMigrationWrapper.new(connection: connection))
          @connection = connection
          @migration_wrapper = migration_wrapper
        end

        # Runs the next batched_job for a batched_background_migration.
        #
        # The batch bounds of the next job are calculated at runtime, based on the migration
        # configuration and the bounds of the most recently created batched_job. Updating the
        # migration configuration will cause future jobs to use the updated batch sizes.
        #
        # The job instance will automatically receive a set of arguments based on the migration
        # configuration. For more details, see the BatchedMigrationWrapper class.
        #
        # Note that this method is primarily intended to called by a scheduled worker.
        def run_migration_job(active_migration)
          if next_batched_job = find_or_create_next_batched_job(active_migration)
            migration_wrapper.perform(next_batched_job)

            active_migration.optimize!
            active_migration.failure! if next_batched_job.failed? && active_migration.should_stop?
          else
            finish_active_migration(active_migration)
          end
        end

        # Runs all remaining batched_jobs for a batched_background_migration.
        #
        # This method is intended to be used in a test/dev environment to execute the background
        # migration inline. It should NOT be used in a real environment for any non-trivial migrations.
        def run_entire_migration(migration)
          unless Rails.env.development? || Rails.env.test?
            raise 'this method is not intended for use in real environments'
          end

          run_migration_while(migration, :active)
        end

        # Finalize migration for given configuration.
        #
        # If the migration is already finished, do nothing. Otherwise change its status to `finalizing`
        # in order to prevent it being picked up by the background worker. Perform all pending jobs,
        # then keep running until migration is finished.
        def finalize(job_class_name, table_name, column_name, job_arguments)
          migration = BatchedMigration.find_for_configuration(job_class_name, table_name, column_name, job_arguments)

          configuration = {
            job_class_name: job_class_name,
            table_name: table_name,
            column_name: column_name,
            job_arguments: job_arguments
          }

          if migration.nil?
            Gitlab::AppLogger.warn "Could not find batched background migration for the given configuration: #{configuration}"
          elsif migration.finished?
            Gitlab::AppLogger.warn "Batched background migration for the given configuration is already finished: #{configuration}"
          else
            migration.finalize!
            migration.batched_jobs.with_status(:pending).each { |job| migration_wrapper.perform(job) }

            run_migration_while(migration, :finalizing)

            raise FailedToFinalize unless migration.finished?
          end
        end

        private

        attr_reader :connection, :migration_wrapper

        def find_or_create_next_batched_job(active_migration)
          if next_batch_range = find_next_batch_range(active_migration)
            active_migration.create_batched_job!(next_batch_range.min, next_batch_range.max)
          else
            active_migration.batched_jobs.retriable.first
          end
        end

        def find_next_batch_range(active_migration)
          batching_strategy = active_migration.batch_class.new(connection: connection)
          batch_min_value = active_migration.next_min_value

          next_batch_bounds = batching_strategy.next_batch(
            active_migration.table_name,
            active_migration.column_name,
            batch_min_value: batch_min_value,
            batch_size: active_migration.batch_size,
            job_arguments: active_migration.job_arguments)

          return if next_batch_bounds.nil?

          clamped_batch_range(active_migration, next_batch_bounds)
        end

        def clamped_batch_range(active_migration, next_bounds)
          min_value, max_value = next_bounds

          return if min_value > active_migration.max_value

          max_value = max_value.clamp(min_value, active_migration.max_value)

          (min_value..max_value)
        end

        def finish_active_migration(active_migration)
          return if active_migration.batched_jobs.active.exists?

          if active_migration.batched_jobs.with_status(:failed).exists?
            active_migration.failure!
          else
            active_migration.finish!
          end
        end

        def run_migration_while(migration, status)
          while migration.status_name == status
            run_migration_job(migration)

            migration.reload_last_job
          end
        end
      end
    end
  end
end