diff options
Diffstat (limited to 'lib/gitlab/batch_pop_queueing.rb')
-rw-r--r-- | lib/gitlab/batch_pop_queueing.rb | 112 |
1 files changed, 112 insertions, 0 deletions
diff --git a/lib/gitlab/batch_pop_queueing.rb b/lib/gitlab/batch_pop_queueing.rb new file mode 100644 index 00000000000..61011abddf5 --- /dev/null +++ b/lib/gitlab/batch_pop_queueing.rb @@ -0,0 +1,112 @@ +# frozen_string_literal: true + +module Gitlab + ## + # This class is a queuing system for processing expensive tasks in an atomic manner + # with batch poping to let you optimize the total processing time. + # + # In usual queuing system, the first item started being processed immediately + # and the following items wait until the next items have been popped from the queue. + # On the other hand, this queueing system, the former part is same, however, + # it pops the enqueued items as batch. This is especially useful when you want to + # drop redandant items from the queue in order to process important items only, + # thus it's more efficient than the traditional queueing system. + # + # Caveats: + # - The order of the items are not guaranteed because of `sadd` (Redis Sets). + # + # Example: + # ``` + # class TheWorker + # def perform + # result = Gitlab::BatchPopQueueing.new('feature', 'queue').safe_execute([item]) do |items_in_queue| + # item = extract_the_most_important_item_from(items_in_queue) + # expensive_process(item) + # end + # + # if result[:status] == :finished && result[:new_items].present? + # item = extract_the_most_important_item_from(items_in_queue) + # TheWorker.perform_async(item.id) + # end + # end + # end + # ``` + # + class BatchPopQueueing + attr_reader :namespace, :queue_id + + EXTRA_QUEUE_EXPIRE_WINDOW = 1.hour + MAX_COUNTS_OF_POP_ALL = 1000 + + # Initialize queue + # + # @param [String] namespace The namespace of the exclusive lock and queue key. Typically, it's a feature name. + # @param [String] queue_id The identifier of the queue. + # @return [Boolean] + def initialize(namespace, queue_id) + raise ArgumentError if namespace.empty? || queue_id.empty? + + @namespace, @queue_id = namespace, queue_id + end + + ## + # Execute the given block in an exclusive lock. + # If there is the other thread has already working on the block, + # it enqueues the items without processing the block. + # + # @param [Array<String>] new_items New items to be added to the queue. + # @param [Time] lock_timeout The timeout of the exclusive lock. Generally, this value should be longer than the maximum prosess timing of the given block. + # @return [Hash] + # - status => One of the `:enqueued` or `:finished`. + # - new_items => Newly enqueued items during the given block had been processed. + # + # NOTE: If an exception is raised in the block, the poppped items will not be recovered. + # We should NOT re-enqueue the items in this case because it could end up in an infinite loop. + def safe_execute(new_items, lock_timeout: 10.minutes, &block) + enqueue(new_items, lock_timeout + EXTRA_QUEUE_EXPIRE_WINDOW) + + lease = Gitlab::ExclusiveLease.new(lock_key, timeout: lock_timeout) + + return { status: :enqueued } unless uuid = lease.try_obtain + + begin + all_args = pop_all + + yield all_args if block_given? + + { status: :finished, new_items: peek_all } + ensure + Gitlab::ExclusiveLease.cancel(lock_key, uuid) + end + end + + private + + def lock_key + @lock_key ||= "batch_pop_queueing:lock:#{namespace}:#{queue_id}" + end + + def queue_key + @queue_key ||= "batch_pop_queueing:queue:#{namespace}:#{queue_id}" + end + + def enqueue(items, expire_time) + Gitlab::Redis::Queues.with do |redis| + redis.sadd(queue_key, items) + redis.expire(queue_key, expire_time.to_i) + end + end + + def pop_all + Gitlab::Redis::Queues.with do |redis| + redis.spop(queue_key, MAX_COUNTS_OF_POP_ALL) + end + end + + def peek_all + Gitlab::Redis::Queues.with do |redis| + redis.smembers(queue_key) + end + end + end +end |