summaryrefslogtreecommitdiff
path: root/storage/mroonga/vendor/groonga/plugins/sharding/logical_table_remove.rb
blob: 3353d6c3a83eb0c9e87dc7297961537cc5991ea0 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
module Groonga
  module Sharding
    class LogicalTableRemoveCommand < Command
      register("logical_table_remove",
               [
                 "logical_table",
                 "shard_key",
                 "min",
                 "min_border",
                 "max",
                 "max_border",
                 "dependent",
                 "force",
               ])

      def run_body(input)
        @dependent = (input[:dependent] == "yes")
        @force = (input[:force] == "yes")

        enumerator = LogicalEnumerator.new("logical_table_remove", input)

        success = true
        enumerator.each do |shard, shard_range|
          remove_shard(shard, shard_range, enumerator.target_range)
        end
        writer.write(success)
      end

      private
      def remove_shard(shard, shard_range, target_range)
        cover_type = target_range.cover_type(shard_range)
        return if cover_type == :none

        shard_key = shard.key
        if shard_key.nil?
          if @force
            context.clear_error
          else
            message =
              "[logical_table_remove] shard_key doesn't exist: " +
              "<#{shard.key_name}>"
            raise InvalidArgument, message
          end
        end
        table = shard.table

        if cover_type == :all or ((table.nil? or shard_key.nil?) and @force)
          remove_table(shard, table)
          return
        end

        expression_builder = RangeExpressionBuilder.new(shard_key,
                                                        target_range)
        case cover_type
        when :partial_min
          remove_records(table) do |expression|
            expression_builder.build_partial_min(expression)
          end
          remove_table(shard, table) if table.empty?
        when :partial_max
          remove_records(table) do |expression|
            expression_builder.build_partial_max(expression)
          end
          remove_table(shard, table) if table.empty?
        when :partial_min_and_max
          remove_records(table) do |expression|
            expression_builder.build_partial_min_and_max(expression)
          end
          remove_table(shard, table) if table.empty?
        end
      end

      def collect_referenced_table_ids_from_index_ids(index_ids,
                                                      referenced_table_ids)
        database = context.database
        index_ids.each do |index_id|
          index = context[index_id]
          if index.nil?
            context.clear_error
            index_name = database[index_id]
            lexicon_name = index_name.split(".", 2)[0]
            lexicon_id = database[lexicon_name]
            referenced_table_ids << lexicon_id if lexicon_id
          else
            referenced_table_ids << index.domain_id
          end
        end
      end

      def collect_referenced_table_ids_from_column_name(column_name,
                                                        referenced_table_ids)
        database = context.database
        column_id = database[column_name]
        database.each_raw do |id, cursor|
          next if ID.builtin?(id)
          next if id == column_id

          context.open_temporary(id) do |object|
            if object.nil?
              context.clear_error
              next
            end

            case object
            when IndexColumn
              if object.source_ids.include?(column_id)
                collect_referenced_table_ids_from_index_ids([id],
                                                            referenced_table_ids)
              end
            end
          end
        end
      end

      def collect_referenced_table_ids_from_column(column,
                                                   referenced_table_ids)
        range = column.range
        case range
        when nil
          context.clear_error
        when Table
          referenced_table_ids << range.id
          collect_referenced_table_ids_from_index_ids(range.index_ids,
                                                      referenced_table_ids)
        end
        collect_referenced_table_ids_from_index_ids(column.index_ids,
                                                    referenced_table_ids)
      end

      def collect_referenced_table_ids_from_column_names(column_names)
        referenced_table_ids = []
        column_names.each do |column_name|
          column = context[column_name]
          if column.nil?
            context.clear_error
            collect_referenced_table_ids_from_column_name(column_name,
                                                          referenced_table_ids)
          else
            collect_referenced_table_ids_from_column(column,
                                                     referenced_table_ids)
          end
        end
        referenced_table_ids
      end

      def collect_referenced_table_ids(shard, table)
        return [] unless @dependent

        column_names = nil
        if table
          begin
            column_names = table.columns.collect(&:name)
          rescue
            context.clear_error
          end
        end
        if column_names.nil?
          prefix = "#{shard.table_name}."
          column_names = []
          context.database.each_name(:prefix => prefix) do |column_name|
            column_names << column_name
          end
        end

        collect_referenced_table_ids_from_column_names(column_names)
      end

      def remove_table(shard, table)
        if table.nil?
          unless @force
            if context.rc == Context::RC::SUCCESS.to_i
              error_class = InvalidArgument
            else
              rc = Context::RC.find(context.rc)
              error_class = rc.error_class
            end
            message = "[logical_table_remove] table is broken: " +
                      "<#{shard.table_name}>: #{context.error_message}"
            raise error_class, message
          end
          context.clear_error
        end

        referenced_table_ids = collect_referenced_table_ids(shard, table)

        if table.nil?
          remove_table_force(shard.table_name)
        else
          options = {:dependent => @dependent}
          if @force
            begin
              table.remove(options)
            rescue
              context.clear_error
              table.close
              remove_table_force(shard.table_name)
            end
          else
            table.remove(options)
          end
        end

        remove_referenced_tables(shard, referenced_table_ids)
      end

      def remove_table_force(table_name)
        database = context.database

        prefix = "#{table_name}."
        database.each_raw(:prefix => prefix) do |id, cursor|
          column = context[id]
          if column.nil?
            context.clear_error
            column_name = cursor.key
            remove_column_force(column_name)
            table = context[table_name]
            if table.nil?
              context.clear_error
            else
              table.close
            end
          else
            remove_column(column)
          end
        end

        table_id = database[table_name]
        return if table_id.nil?

        database.each_raw do |id, cursor|
          next if ID.builtin?(id)
          next if id == table_id

          context.open_temporary(id) do |object|
            if object.nil?
              context.clear_error
              next
            end

            case object
            when Table
              if object.domain_id == table_id
                begin
                  object.remove(:dependent => @dependent)
                rescue
                  context.clear_error
                  reference_table_name = object.name
                  object.close
                  remove_table_force(reference_table_name)
                end
              end
            when Column
              if object.range_id == table_id
                remove_column(object)
              end
            end
          end
        end

        Object.remove_force(table_name)
      end

      def remove_column(column)
        begin
          column.remove(:dependent => @dependent)
        rescue
          context.clear_error
          column_name = column.name
          column.close
          remove_column_force(column_name)
        end
      end

      def remove_column_force(column_name)
        database = context.database

        column_id = database[column_name]

        column = context[column_id]
        if column.nil?
          context.clear_error
        else
          column.index_ids.each do |id|
            index_column = context[id]
            if index_column.nil?
              context.clear_error
              index_column_name = database[id]
              remove_column_force(index_column_name)
            else
              remove_column(index_column)
            end
          end
          column.close
        end

        Object.remove_force(column_name)
      end

      def remove_referenced_tables(shard, referenced_table_ids)
        return if referenced_table_ids.empty?

        database = context.database
        shard_suffix = shard.range_data.to_suffix
        referenced_table_ids.uniq.each do |referenced_table_id|
          referenced_table_name = database[referenced_table_id]
          next if referenced_table_name.nil?
          next unless referenced_table_name.end_with?(shard_suffix)

          referenced_table = context[referenced_table_id]
          if referenced_table.nil?
            context.clear_error
            if @force
              Object.remove_force(referenced_table_name)
            end
            next
          end

          if @force
            begin
              referenced_table.remove(:dependent => @dependent)
            rescue
              context.clear_error
              referenced_table.close
              remove_table_force(referenced_table_name)
            end
          else
            referenced_table.remove(:dependent => @dependent)
          end
        end
      end

      def remove_records(table)
        expression = nil

        begin
          expression = Expression.create(table)
          yield(expression)
          table.delete(:expression => expression)
        ensure
          expression.close if expression
        end
      end
    end
  end
end