summaryrefslogtreecommitdiff
path: root/lib/gitlab/sidekiq_config/worker_matcher.rb
diff options
context:
space:
mode:
Diffstat (limited to 'lib/gitlab/sidekiq_config/worker_matcher.rb')
-rw-r--r--lib/gitlab/sidekiq_config/worker_matcher.rb86
1 files changed, 86 insertions, 0 deletions
diff --git a/lib/gitlab/sidekiq_config/worker_matcher.rb b/lib/gitlab/sidekiq_config/worker_matcher.rb
new file mode 100644
index 00000000000..fe5ac10c65a
--- /dev/null
+++ b/lib/gitlab/sidekiq_config/worker_matcher.rb
@@ -0,0 +1,86 @@
+# frozen_string_literal: true
+
+module Gitlab
+ module SidekiqConfig
+ class WorkerMatcher
+ WILDCARD_MATCH = '*'
+ QUERY_OR_OPERATOR = '|'
+ QUERY_AND_OPERATOR = '&'
+ QUERY_CONCATENATE_OPERATOR = ','
+ QUERY_TERM_REGEX = %r{^(\w+)(!?=)([\w:#{QUERY_CONCATENATE_OPERATOR}]+)}.freeze
+
+ QUERY_PREDICATES = {
+ feature_category: :to_sym,
+ has_external_dependencies: lambda { |value| value == 'true' },
+ name: :to_s,
+ resource_boundary: :to_sym,
+ tags: :to_sym,
+ urgency: :to_sym
+ }.freeze
+
+ QueryError = Class.new(StandardError)
+ InvalidTerm = Class.new(QueryError)
+ UnknownOperator = Class.new(QueryError)
+ UnknownPredicate = Class.new(QueryError)
+
+ def initialize(query_string)
+ @match_lambda = query_string_to_lambda(query_string)
+ end
+
+ def match?(worker_metadata)
+ @match_lambda.call(worker_metadata)
+ end
+
+ private
+
+ def query_string_to_lambda(query_string)
+ return lambda { |_worker| true } if query_string.strip == WILDCARD_MATCH
+
+ or_clauses = query_string.split(QUERY_OR_OPERATOR).map do |and_clauses_string|
+ and_clauses_predicates = and_clauses_string.split(QUERY_AND_OPERATOR).map do |term|
+ predicate_for_term(term)
+ end
+
+ lambda { |worker| and_clauses_predicates.all? { |predicate| predicate.call(worker) } }
+ end
+
+ lambda { |worker| or_clauses.any? { |predicate| predicate.call(worker) } }
+ end
+
+ def predicate_for_term(term)
+ match = term.match(QUERY_TERM_REGEX)
+
+ raise InvalidTerm.new("Invalid term: #{term}") unless match
+
+ _, lhs, op, rhs = *match
+
+ predicate_for_op(op, predicate_factory(lhs, rhs.split(QUERY_CONCATENATE_OPERATOR)))
+ end
+
+ def predicate_for_op(op, predicate)
+ case op
+ when '='
+ predicate
+ when '!='
+ lambda { |worker| !predicate.call(worker) }
+ else
+ # This is unreachable because InvalidTerm will be raised instead, but
+ # keeping it allows to guard against that changing in future.
+ raise UnknownOperator.new("Unknown operator: #{op}")
+ end
+ end
+
+ def predicate_factory(lhs, values)
+ values_block = QUERY_PREDICATES[lhs.to_sym]
+
+ raise UnknownPredicate.new("Unknown predicate: #{lhs}") unless values_block
+
+ lambda do |queue|
+ comparator = Array(queue[lhs.to_sym]).to_set
+
+ values.map(&values_block).to_set.intersect?(comparator)
+ end
+ end
+ end
+ end
+end