summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorShinya Maeda <shinya@gitlab.com>2019-07-02 18:14:31 +0700
committerShinya Maeda <shinya@gitlab.com>2019-07-11 13:44:12 +0700
commit929ada0c0aa552017c4c3b1c51c56e05c001a047 (patch)
treec77f1e122ebb9c028e42f72d8fa23649edd35a46
parent43eeba0488b4133f5c55b81e833a73233107aba0 (diff)
downloadgitlab-ce-better-merge-train-exlusive-lock-ce.tar.gz
Efficient merge train locksbetter-merge-train-exlusive-lock-ce
Efficient merge train locks with Sequential Process helper.
-rw-r--r--lib/gitlab/batch_pop_queueing.rb112
-rw-r--r--spec/lib/gitlab/batch_pop_queueing_spec.rb147
2 files changed, 259 insertions, 0 deletions
diff --git a/lib/gitlab/batch_pop_queueing.rb b/lib/gitlab/batch_pop_queueing.rb
new file mode 100644
index 00000000000..61011abddf5
--- /dev/null
+++ b/lib/gitlab/batch_pop_queueing.rb
@@ -0,0 +1,112 @@
+# frozen_string_literal: true
+
+module Gitlab
+ ##
+ # This class is a queuing system for processing expensive tasks in an atomic manner
+ # with batch poping to let you optimize the total processing time.
+ #
+ # In usual queuing system, the first item started being processed immediately
+ # and the following items wait until the next items have been popped from the queue.
+ # On the other hand, this queueing system, the former part is same, however,
+ # it pops the enqueued items as batch. This is especially useful when you want to
+ # drop redandant items from the queue in order to process important items only,
+ # thus it's more efficient than the traditional queueing system.
+ #
+ # Caveats:
+ # - The order of the items are not guaranteed because of `sadd` (Redis Sets).
+ #
+ # Example:
+ # ```
+ # class TheWorker
+ # def perform
+ # result = Gitlab::BatchPopQueueing.new('feature', 'queue').safe_execute([item]) do |items_in_queue|
+ # item = extract_the_most_important_item_from(items_in_queue)
+ # expensive_process(item)
+ # end
+ #
+ # if result[:status] == :finished && result[:new_items].present?
+ # item = extract_the_most_important_item_from(items_in_queue)
+ # TheWorker.perform_async(item.id)
+ # end
+ # end
+ # end
+ # ```
+ #
+ class BatchPopQueueing
+ attr_reader :namespace, :queue_id
+
+ EXTRA_QUEUE_EXPIRE_WINDOW = 1.hour
+ MAX_COUNTS_OF_POP_ALL = 1000
+
+ # Initialize queue
+ #
+ # @param [String] namespace The namespace of the exclusive lock and queue key. Typically, it's a feature name.
+ # @param [String] queue_id The identifier of the queue.
+ # @return [Boolean]
+ def initialize(namespace, queue_id)
+ raise ArgumentError if namespace.empty? || queue_id.empty?
+
+ @namespace, @queue_id = namespace, queue_id
+ end
+
+ ##
+ # Execute the given block in an exclusive lock.
+ # If there is the other thread has already working on the block,
+ # it enqueues the items without processing the block.
+ #
+ # @param [Array<String>] new_items New items to be added to the queue.
+ # @param [Time] lock_timeout The timeout of the exclusive lock. Generally, this value should be longer than the maximum prosess timing of the given block.
+ # @return [Hash]
+ # - status => One of the `:enqueued` or `:finished`.
+ # - new_items => Newly enqueued items during the given block had been processed.
+ #
+ # NOTE: If an exception is raised in the block, the poppped items will not be recovered.
+ # We should NOT re-enqueue the items in this case because it could end up in an infinite loop.
+ def safe_execute(new_items, lock_timeout: 10.minutes, &block)
+ enqueue(new_items, lock_timeout + EXTRA_QUEUE_EXPIRE_WINDOW)
+
+ lease = Gitlab::ExclusiveLease.new(lock_key, timeout: lock_timeout)
+
+ return { status: :enqueued } unless uuid = lease.try_obtain
+
+ begin
+ all_args = pop_all
+
+ yield all_args if block_given?
+
+ { status: :finished, new_items: peek_all }
+ ensure
+ Gitlab::ExclusiveLease.cancel(lock_key, uuid)
+ end
+ end
+
+ private
+
+ def lock_key
+ @lock_key ||= "batch_pop_queueing:lock:#{namespace}:#{queue_id}"
+ end
+
+ def queue_key
+ @queue_key ||= "batch_pop_queueing:queue:#{namespace}:#{queue_id}"
+ end
+
+ def enqueue(items, expire_time)
+ Gitlab::Redis::Queues.with do |redis|
+ redis.sadd(queue_key, items)
+ redis.expire(queue_key, expire_time.to_i)
+ end
+ end
+
+ def pop_all
+ Gitlab::Redis::Queues.with do |redis|
+ redis.spop(queue_key, MAX_COUNTS_OF_POP_ALL)
+ end
+ end
+
+ def peek_all
+ Gitlab::Redis::Queues.with do |redis|
+ redis.smembers(queue_key)
+ end
+ end
+ end
+end
diff --git a/spec/lib/gitlab/batch_pop_queueing_spec.rb b/spec/lib/gitlab/batch_pop_queueing_spec.rb
new file mode 100644
index 00000000000..28984d52024
--- /dev/null
+++ b/spec/lib/gitlab/batch_pop_queueing_spec.rb
@@ -0,0 +1,147 @@
+# frozen_string_literal: true
+
+require 'spec_helper'
+
+describe Gitlab::BatchPopQueueing do
+ include ExclusiveLeaseHelpers
+ using RSpec::Parameterized::TableSyntax
+
+ describe '#initialize' do
+ where(:namespace, :queue_id, :expect_error, :error_type) do
+ 'feature' | '1' | false | nil
+ :feature | '1' | false | nil
+ nil | '1' | true | NoMethodError
+ 'feature' | nil | true | NoMethodError
+ '' | '1' | true | ArgumentError
+ 'feature' | '' | true | ArgumentError
+ 'feature' | 1 | true | NoMethodError
+ end
+
+ with_them do
+ it do
+ if expect_error
+ expect { described_class.new(namespace, queue_id) }.to raise_error(error_type)
+ else
+ expect { described_class.new(namespace, queue_id) }.not_to raise_error
+ end
+ end
+ end
+ end
+
+ describe '#safe_execute', :clean_gitlab_redis_queues do
+ subject { queue.safe_execute(new_items, lock_timeout: lock_timeout) }
+
+ let(:queue) { described_class.new(namespace, queue_id) }
+ let(:namespace) { 'feature' }
+ let(:queue_id) { '1' }
+ let(:lock_timeout) { 10.minutes }
+ let(:new_items) { %w[A B] }
+ let(:lock_key) { queue.send(:lock_key) }
+ let(:queue_key) { queue.send(:queue_key) }
+
+ it 'enqueues new items always' do
+ Gitlab::Redis::Queues.with do |redis|
+ expect(redis).to receive(:sadd).with(queue_key, new_items)
+ expect(redis).to receive(:expire).with(queue_key, (lock_timeout + described_class::EXTRA_QUEUE_EXPIRE_WINDOW).to_i)
+ end
+
+ subject
+ end
+
+ it 'yields the new items with exclusive lease' do
+ uuid = 'test'
+ expect_to_obtain_exclusive_lease(lock_key, uuid, timeout: lock_timeout)
+ expect_to_cancel_exclusive_lease(lock_key, uuid)
+
+ expect { |b| queue.safe_execute(new_items, lock_timeout: lock_timeout, &b) }
+ .to yield_with_args(match_array(new_items))
+ end
+
+ it 'returns the result and no items in the queue' do
+ expect(subject[:status]).to eq(:finished)
+ expect(subject[:new_items]).to be_empty
+
+ Gitlab::Redis::Queues.with do |redis|
+ expect(redis.llen(queue_key)).to be(0)
+ end
+ end
+
+ context 'when new items are enqueued during the process' do
+ it 'returns the result with newly added items' do
+ result = queue.safe_execute(new_items) do
+ queue.safe_execute(['C'])
+ end
+
+ expect(result[:status]).to eq(:finished)
+ expect(result[:new_items]).to eq(['C'])
+
+ Gitlab::Redis::Queues.with do |redis|
+ expect(redis.scard(queue_key)).to be(1)
+ end
+ end
+ end
+
+ context 'when interger items are enqueued' do
+ let(:new_items) { [1, 2, 3] }
+
+ it 'yields as String values' do
+ expect { |b| queue.safe_execute(new_items, lock_timeout: lock_timeout, &b) }
+ .to yield_with_args(%w[1 2 3])
+ end
+ end
+
+ context 'when the queue key does not exist in Redis' do
+ before do
+ allow(queue).to receive(:enqueue) { }
+ end
+
+ it 'yields empty array' do
+ expect { |b| queue.safe_execute(new_items, lock_timeout: lock_timeout, &b) }
+ .to yield_with_args([])
+ end
+ end
+
+ context 'when the other process has already been working on the queue' do
+ before do
+ stub_exclusive_lease_taken(lock_key, timeout: lock_timeout)
+ end
+
+ it 'does not yield the block' do
+ expect { |b| queue.safe_execute(new_items, lock_timeout: lock_timeout, &b) }
+ .not_to yield_control
+ end
+
+ it 'returns the result' do
+ expect(subject[:status]).to eq(:enqueued)
+ end
+ end
+
+ context 'when a duplicate item is enqueued' do
+ it 'returns the poped items to the queue and raise an error' do
+ expect { |b| queue.safe_execute(%w[1 1 2 2], &b) }
+ .to yield_with_args(match_array(%w[1 2]))
+ end
+ end
+
+ context 'when there are two queues' do
+ it 'enqueues items to each queue' do
+ queue_1 = described_class.new(namespace, '1')
+ queue_2 = described_class.new(namespace, '2')
+
+ result_2 = nil
+
+ result_1 = queue_1.safe_execute(['A']) do |_|
+ result_2 = queue_2.safe_execute(['B']) do |_|
+ queue_1.safe_execute(['C'])
+ queue_2.safe_execute(['D'])
+ end
+ end
+
+ expect(result_1[:status]).to eq(:finished)
+ expect(result_1[:new_items]).to eq(['C'])
+ expect(result_2[:status]).to eq(:finished)
+ expect(result_2[:new_items]).to eq(['D'])
+ end
+ end
+ end
+end