summaryrefslogtreecommitdiff
path: root/storage/mroonga/vendor/groonga/plugins/sharding/logical_count.rb
blob: 610dae834d324007e44ec3158b2eb033a67245e7 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
module Groonga
  module Sharding
    class LogicalCountCommand < Command
      register("logical_count",
               [
                 "logical_table",
                 "shard_key",
                 "min",
                 "min_border",
                 "max",
                 "max_border",
                 "filter",
               ])

      def run_body(input)
        enumerator = LogicalEnumerator.new("logical_count", input)
        filter = input[:filter]

        total = 0
        enumerator.each do |table, shard_key, shard_range|
          total += count_n_records(table, filter,
                                   shard_key, shard_range,
                                   enumerator.target_range)
        end
        writer.write(total)
      end

      private
      def count_n_records(table, filter,
                          shard_key, shard_range,
                          target_range)
        cover_type = target_range.cover_type(shard_range)
        return 0 if cover_type == :none

        expression_builder = RangeExpressionBuilder.new(shard_key,
                                                        target_range,
                                                        filter)
        if cover_type == :all
          if filter.nil?
            return table.size
          else
            return filtered_count_n_records(table) do |expression|
              expression_builder.build_all(expression)
            end
          end
        end

        range_index = nil
        if filter.nil?
          index_info = shard_key.find_index(Operator::LESS)
          if index_info
            range_index = index_info.index
          end
        end

        case cover_type
        when :partial_min
          if range_index
            count_n_records_in_range(range_index,
                                     target_range.min, target_range.min_border,
                                     nil, nil)
          else
            filtered_count_n_records(table) do |expression|
              expression_builder.build_partial_min(expression)
            end
          end
        when :partial_max
          if range_index
            count_n_records_in_range(range_index,
                                     nil, nil,
                                     target_range.max, target_range.max_border)
          else
            filtered_count_n_records(table) do |expression|
              expression_builder.build_partial_max(expression)
            end
          end
        when :partial_min_and_max
          if range_index
            count_n_records_in_range(range_index,
                                     target_range.min, target_range.min_border,
                                     target_range.max, target_range.max_border)
          else
            filtered_count_n_records(table) do |expression|
              expression_builder.build_partial_min_and_max(expression)
            end
          end
        end
      end

      def filtered_count_n_records(table)
        expression = nil
        filtered_table = nil

        begin
          expression = Expression.create(table)
          yield(expression)
          filtered_table = table.select(expression)
          filtered_table.size
        ensure
          filtered_table.close if filtered_table
          expression.close if expression
        end
      end

      def count_n_records_in_range(range_index,
                                   min, min_border, max, max_border)
        flags = TableCursorFlags::BY_KEY
        case min_border
        when :include
          flags |= TableCursorFlags::GE
        when :exclude
          flags |= TableCursorFlags::GT
        end
        case max_border
        when :include
          flags |= TableCursorFlags::LE
        when :exclude
          flags |= TableCursorFlags::LT
        end

        TableCursor.open(range_index.table,
                         :min => min,
                         :max => max,
                         :flags => flags) do |table_cursor|
          IndexCursor.open(table_cursor, range_index) do |index_cursor|
            index_cursor.count
          end
        end
      end
    end
  end
end