summaryrefslogtreecommitdiff
path: root/lib/gitlab/database/concurrent_reindex.rb
blob: 485ab35e55de263d232ab6f7897444162fd8033f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
# frozen_string_literal: true

module Gitlab
  module Database
    class ConcurrentReindex
      include Gitlab::Utils::StrongMemoize
      include MigrationHelpers

      ReindexError = Class.new(StandardError)

      PG_IDENTIFIER_LENGTH = 63
      TEMPORARY_INDEX_PREFIX = 'tmp_reindex_'
      REPLACED_INDEX_PREFIX = 'old_reindex_'

      attr_reader :index_name, :logger

      def initialize(index_name, logger:)
        @index_name = index_name
        @logger = logger
      end

      def execute
        raise ReindexError, "index #{index_name} does not exist" unless index_exists?

        raise ReindexError, 'UNIQUE indexes are currently not supported' if index_unique?

        logger.debug("dropping dangling index from previous run: #{replacement_index_name}")
        remove_replacement_index

        begin
          create_replacement_index

          unless replacement_index_valid?
            message = 'replacement index was created as INVALID'
            logger.error("#{message}, cleaning up")
            raise ReindexError, "failed to reindex #{index_name}: #{message}"
          end

          swap_replacement_index
        rescue Gitlab::Database::WithLockRetries::AttemptsExhaustedError => e
          logger.error('failed to obtain the required database locks to swap the indexes, cleaning up')
          raise ReindexError, e.message
        rescue ActiveRecord::ActiveRecordError, PG::Error => e
          logger.error("database error while attempting reindex of #{index_name}: #{e.message}")
          raise ReindexError, e.message
        ensure
          logger.info("dropping unneeded replacement index: #{replacement_index_name}")
          remove_replacement_index
        end
      end

      private

      def connection
        @connection ||= ActiveRecord::Base.connection
      end

      def replacement_index_name
        @replacement_index_name ||= constrained_index_name(TEMPORARY_INDEX_PREFIX)
      end

      def index
        strong_memoize(:index) do
          find_index(index_name)
        end
      end

      def index_exists?
        !index.nil?
      end

      def index_unique?
        index.indisunique
      end

      def constrained_index_name(prefix)
        "#{prefix}#{index_name}".slice(0, PG_IDENTIFIER_LENGTH)
      end

      def create_replacement_index
        create_replacement_index_statement = index.indexdef
          .sub(/CREATE INDEX/, 'CREATE INDEX CONCURRENTLY')
          .sub(/#{index_name}/, replacement_index_name)

        logger.info("creating replacement index #{replacement_index_name}")
        logger.debug("replacement index definition: #{create_replacement_index_statement}")

        disable_statement_timeout do
          connection.execute(create_replacement_index_statement)
        end
      end

      def replacement_index_valid?
        find_index(replacement_index_name).indisvalid
      end

      def find_index(index_name)
        record = connection.select_one(<<~SQL)
          SELECT
            pg_index.indisunique,
            pg_index.indisvalid,
            pg_indexes.indexdef
          FROM pg_index
          INNER JOIN pg_class ON pg_class.oid = pg_index.indexrelid
          INNER JOIN pg_namespace ON pg_class.relnamespace = pg_namespace.oid
          INNER JOIN pg_indexes ON pg_class.relname = pg_indexes.indexname
          WHERE pg_namespace.nspname = 'public'
          AND pg_class.relname = #{connection.quote(index_name)}
        SQL

        OpenStruct.new(record) if record
      end

      def swap_replacement_index
        replaced_index_name = constrained_index_name(REPLACED_INDEX_PREFIX)

        logger.info("swapping replacement index #{replacement_index_name} with #{index_name}")

        with_lock_retries do
          rename_index(index_name, replaced_index_name)
          rename_index(replacement_index_name, index_name)
          rename_index(replaced_index_name, replacement_index_name)
        end
      end

      def rename_index(old_index_name, new_index_name)
        connection.execute("ALTER INDEX #{old_index_name} RENAME TO #{new_index_name}")
      end

      def remove_replacement_index
        disable_statement_timeout do
          connection.execute("DROP INDEX CONCURRENTLY IF EXISTS #{replacement_index_name}")
        end
      end

      def with_lock_retries(&block)
        arguments = { klass: self.class, logger: logger }

        Gitlab::Database::WithLockRetries.new(arguments).run(raise_on_exhaustion: true, &block)
      end
    end
  end
end