summaryrefslogtreecommitdiff
path: root/app/models/namespaces/traversal/linear_scopes.rb
blob: 792964a6c7f2d4a637fef3fe8b6bbfccc1b57eb9 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
# frozen_string_literal: true

module Namespaces
  module Traversal
    module LinearScopes
      extend ActiveSupport::Concern

      include AsCte

      class_methods do
        # When filtering namespaces by the traversal_ids column to compile a
        # list of namespace IDs, it can be faster to reference the ID in
        # traversal_ids than the primary key ID column.
        def as_ids
          return super unless use_traversal_ids?

          select(Arel.sql('namespaces.traversal_ids[array_length(namespaces.traversal_ids, 1)]').as('id'))
        end

        def roots
          return super unless use_traversal_ids_roots?

          root_ids = all.select("#{quoted_table_name}.traversal_ids[1]").distinct
          unscoped.where(id: root_ids)
        end

        def self_and_ancestors(include_self: true, upto: nil, hierarchy_order: nil)
          return super unless use_traversal_ids_for_ancestor_scopes?

          self_and_ancestors_from_inner_join(
            include_self: include_self,
            upto: upto, hierarchy_order:
            hierarchy_order
          )
        end

        def self_and_ancestor_ids(include_self: true)
          return super unless use_traversal_ids_for_ancestor_scopes?

          self_and_ancestors(include_self: include_self).as_ids
        end

        def self_and_descendants(include_self: true)
          return super unless use_traversal_ids_for_descendants_scopes?

          self_and_descendants_with_comparison_operators(include_self: include_self)
        end

        def self_and_descendant_ids(include_self: true)
          return super unless use_traversal_ids_for_descendants_scopes?

          self_and_descendants(include_self: include_self).as_ids
        end

        def self_and_hierarchy
          return super unless use_traversal_ids_for_self_and_hierarchy_scopes?

          unscoped.from_union([all.self_and_ancestors, all.self_and_descendants(include_self: false)])
        end

        def order_by_depth(hierarchy_order)
          return all unless hierarchy_order

          depth_order = hierarchy_order == :asc ? :desc : :asc

          all
            .select(Namespace.default_select_columns, 'array_length(traversal_ids, 1) as depth')
            .order(depth: depth_order, id: :asc)
        end

        # Produce a query of the form: SELECT * FROM namespaces;
        #
        # When we have queries that break this SELECT * format we can run in to errors.
        # For example `SELECT DISTINCT on(...)` will fail when we chain a `.count` c
        def normal_select
          unscoped.from(all, :namespaces)
        end

        private

        def use_traversal_ids?
          Feature.enabled?(:use_traversal_ids)
        end

        def use_traversal_ids_roots?
          Feature.enabled?(:use_traversal_ids_roots) &&
          use_traversal_ids?
        end

        def use_traversal_ids_for_ancestor_scopes?
          Feature.enabled?(:use_traversal_ids_for_ancestor_scopes) &&
          use_traversal_ids?
        end

        def use_traversal_ids_for_descendants_scopes?
          Feature.enabled?(:use_traversal_ids_for_descendants_scopes) &&
          use_traversal_ids?
        end

        def use_traversal_ids_for_self_and_hierarchy_scopes?
          Feature.enabled?(:use_traversal_ids_for_self_and_hierarchy_scopes) &&
            use_traversal_ids?
        end

        def self_and_ancestors_from_inner_join(include_self: true, upto: nil, hierarchy_order: nil)
          base_cte = all.reselect('namespaces.traversal_ids').as_cte(:base_ancestors_cte)

          unnest = if include_self
                     base_cte.table[:traversal_ids]
                   else
                     base_cte_traversal_ids = 'base_ancestors_cte.traversal_ids'
                     traversal_ids_range = "1:array_length(#{base_cte_traversal_ids},1)-1"
                     Arel.sql("#{base_cte_traversal_ids}[#{traversal_ids_range}]")
                   end

          ancestor_subselect = "SELECT DISTINCT #{unnest_func(unnest).to_sql} FROM base_ancestors_cte"
          ancestors_join = <<~SQL
            INNER JOIN (#{ancestor_subselect}) AS ancestors(ancestor_id) ON namespaces.id = ancestors.ancestor_id
          SQL

          namespaces = Arel::Table.new(:namespaces)

          records = unscoped
            .with(base_cte.to_arel)
            .from(namespaces)
            .joins(ancestors_join)
            .order_by_depth(hierarchy_order)

          if upto
            upto_ancestor_ids = unscoped.where(id: upto).select(unnest_func(Arel.sql('traversal_ids')))
            records = records.where.not(id: upto_ancestor_ids)
          end

          records
        end

        def self_and_descendants_with_comparison_operators(include_self: true)
          base = all.select(:id, :traversal_ids)
          base_cte = base.as_cte(:descendants_base_cte)

          namespaces = Arel::Table.new(:namespaces)

          superset_cte = self.superset_cte(base_cte.table.name)
          withs = [base_cte.to_arel, superset_cte.to_arel]
          # Order is important. namespace should be last to handle future joins.
          froms = [superset_cte.table, namespaces]

          base_ref = froms.first

          # Bound the search space to ourselves (optional) and descendants.
          #
          # WHERE next_traversal_ids_sibling(base_cte.traversal_ids) > namespaces.traversal_ids
          records = unscoped
            .distinct
            .with(*withs)
            .from(froms)
            .where(next_sibling_func(base_ref[:traversal_ids]).gt(namespaces[:traversal_ids]))

          #   AND base_cte.traversal_ids <= namespaces.traversal_ids
          if include_self
            records.where(base_ref[:traversal_ids].lteq(namespaces[:traversal_ids]))
          else
            records.where(base_ref[:traversal_ids].lt(namespaces[:traversal_ids]))
          end
        end

        def next_sibling_func(*args)
          Arel::Nodes::NamedFunction.new('next_traversal_ids_sibling', args)
        end

        def unnest_func(*args)
          Arel::Nodes::NamedFunction.new('unnest', args)
        end

        def superset_cte(base_name)
          superset_sql = <<~SQL
            SELECT d1.traversal_ids
            FROM #{base_name} d1
            WHERE NOT EXISTS (
              SELECT 1
              FROM #{base_name} d2
              WHERE d2.id = ANY(d1.traversal_ids)
                AND d2.id <> d1.id
            )
          SQL

          Gitlab::SQL::CTE.new(:superset, superset_sql, materialized: false)
        end
      end
    end
  end
end