summaryrefslogtreecommitdiff
path: root/lib/gitlab/batch_pop_queueing.rb
diff options
context:
space:
mode:
Diffstat (limited to 'lib/gitlab/batch_pop_queueing.rb')
-rw-r--r--lib/gitlab/batch_pop_queueing.rb113
1 files changed, 0 insertions, 113 deletions
diff --git a/lib/gitlab/batch_pop_queueing.rb b/lib/gitlab/batch_pop_queueing.rb
deleted file mode 100644
index 103ce644f2b..00000000000
--- a/lib/gitlab/batch_pop_queueing.rb
+++ /dev/null
@@ -1,113 +0,0 @@
-# frozen_string_literal: true
-
-module Gitlab
- ##
- # This class is a queuing system for processing expensive tasks in an atomic manner
- # with batch poping to let you optimize the total processing time.
- #
- # In usual queuing system, the first item started being processed immediately
- # and the following items wait until the next items have been popped from the queue.
- # On the other hand, this queueing system, the former part is same, however,
- # it pops the enqueued items as batch. This is especially useful when you want to
- # drop redundant items from the queue in order to process important items only,
- # thus it's more efficient than the traditional queueing system.
- #
- # Caveats:
- # - The order of the items are not guaranteed because of `sadd` (Redis Sets).
- #
- # Example:
- # ```
- # class TheWorker
- # def perform
- # result = Gitlab::BatchPopQueueing.new('feature', 'queue').safe_execute([item]) do |items_in_queue|
- # item = extract_the_most_important_item_from(items_in_queue)
- # expensive_process(item)
- # end
- #
- # if result[:status] == :finished && result[:new_items].present?
- # item = extract_the_most_important_item_from(items_in_queue)
- # TheWorker.perform_async(item.id)
- # end
- # end
- # end
- # ```
- #
- class BatchPopQueueing
- attr_reader :namespace, :queue_id
-
- EXTRA_QUEUE_EXPIRE_WINDOW = 1.hour
- MAX_COUNTS_OF_POP_ALL = 1000
-
- # Initialize queue
- #
- # @param [String] namespace The namespace of the exclusive lock and queue key. Typically, it's a feature name.
- # @param [String] queue_id The identifier of the queue.
- # @return [Boolean]
- def initialize(namespace, queue_id)
- raise ArgumentError if namespace.empty? || queue_id.empty?
-
- @namespace = namespace
- @queue_id = queue_id
- end
-
- ##
- # Execute the given block in an exclusive lock.
- # If there is the other thread has already working on the block,
- # it enqueues the items without processing the block.
- #
- # @param [Array<String>] new_items New items to be added to the queue.
- # @param [Time] lock_timeout The timeout of the exclusive lock. Generally, this value should be longer than the maximum prosess timing of the given block.
- # @return [Hash]
- # - status => One of the `:enqueued` or `:finished`.
- # - new_items => Newly enqueued items during the given block had been processed.
- #
- # NOTE: If an exception is raised in the block, the poppped items will not be recovered.
- # We should NOT re-enqueue the items in this case because it could end up in an infinite loop.
- def safe_execute(new_items, lock_timeout: 10.minutes, &block)
- enqueue(new_items, lock_timeout + EXTRA_QUEUE_EXPIRE_WINDOW)
-
- lease = Gitlab::ExclusiveLease.new(lock_key, timeout: lock_timeout)
-
- return { status: :enqueued } unless uuid = lease.try_obtain
-
- begin
- all_args = pop_all
-
- yield all_args if block
-
- { status: :finished, new_items: peek_all }
- ensure
- Gitlab::ExclusiveLease.cancel(lock_key, uuid)
- end
- end
-
- private
-
- def lock_key
- @lock_key ||= "batch_pop_queueing:lock:#{namespace}:#{queue_id}"
- end
-
- def queue_key
- @queue_key ||= "batch_pop_queueing:queue:#{namespace}:#{queue_id}"
- end
-
- def enqueue(items, expire_time)
- Gitlab::Redis::Queues.with do |redis|
- redis.sadd(queue_key, items)
- redis.expire(queue_key, expire_time.to_i)
- end
- end
-
- def pop_all
- Gitlab::Redis::Queues.with do |redis|
- redis.spop(queue_key, MAX_COUNTS_OF_POP_ALL)
- end
- end
-
- def peek_all
- Gitlab::Redis::Queues.with do |redis|
- redis.smembers(queue_key)
- end
- end
- end
-end