summaryrefslogtreecommitdiff
path: root/lib/gitlab/database/partitioning_migration_helpers/backfill_partitioned_table.rb
blob: 17a42d997e6531f71548cd39edd30e27d1c15884 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
# frozen_string_literal: true

module Gitlab
  module Database
    module PartitioningMigrationHelpers
      # Class that will generically copy data from a given table into its corresponding partitioned table
      class BackfillPartitionedTable
        include ::Gitlab::Database::DynamicModelHelpers

        SUB_BATCH_SIZE = 2_500
        PAUSE_SECONDS = 0.25

        def perform(start_id, stop_id, source_table, partitioned_table, source_column)
          if transaction_open?
            raise "Aborting job to backfill partitioned #{source_table} table! Do not run this job in a transaction block!"
          end

          unless table_exists?(partitioned_table)
            logger.warn "exiting backfill migration because partitioned table #{partitioned_table} does not exist. " \
              "This could be due to the migration being rolled back after migration jobs were enqueued in sidekiq"
            return
          end

          bulk_copy = BulkCopy.new(source_table, partitioned_table, source_column)
          parent_batch_relation = relation_scoped_to_range(source_table, source_column, start_id, stop_id)

          parent_batch_relation.each_batch(of: SUB_BATCH_SIZE) do |sub_batch|
            sub_start_id, sub_stop_id = sub_batch.pluck(Arel.sql("MIN(#{source_column}), MAX(#{source_column})")).first

            bulk_copy.copy_between(sub_start_id, sub_stop_id)
            sleep(PAUSE_SECONDS)
          end

          mark_jobs_as_succeeded(start_id, stop_id, source_table, partitioned_table, source_column)
        end

        private

        def connection
          ActiveRecord::Base.connection
        end

        def transaction_open?
          connection.transaction_open?
        end

        def table_exists?(table)
          connection.table_exists?(table)
        end

        def logger
          @logger ||= ::Gitlab::BackgroundMigration::Logger.build
        end

        def relation_scoped_to_range(source_table, source_key_column, start_id, stop_id)
          define_batchable_model(source_table).where(source_key_column => start_id..stop_id)
        end

        def mark_jobs_as_succeeded(*arguments)
          BackgroundMigrationJob.mark_all_as_succeeded(self.class.name, arguments)
        end

        # Helper class to copy data between two tables via upserts
        class BulkCopy
          DELIMITER = ', '

          attr_reader :source_table, :destination_table, :source_column

          def initialize(source_table, destination_table, source_column)
            @source_table = source_table
            @destination_table = destination_table
            @source_column = source_column
          end

          def copy_between(start_id, stop_id)
            connection.execute(<<~SQL)
              INSERT INTO #{destination_table} (#{column_listing})
              SELECT #{column_listing}
              FROM #{source_table}
              WHERE #{source_column} BETWEEN #{start_id} AND #{stop_id}
              FOR UPDATE
              ON CONFLICT (#{conflict_targets}) DO NOTHING
            SQL
          end

          private

          def connection
            @connection ||= ActiveRecord::Base.connection
          end

          def column_listing
            @column_listing ||= connection.columns(source_table).map(&:name).join(DELIMITER)
          end

          def conflict_targets
            connection.primary_key(destination_table).join(DELIMITER)
          end
        end
      end
    end
  end
end