summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--app/workers/authorized_projects_worker.rb36
-rw-r--r--app/workers/concerns/waitable_worker.rb44
-rw-r--r--lib/gitlab/job_waiter.rb8
-rw-r--r--spec/workers/authorized_projects_worker_spec.rb79
-rw-r--r--spec/workers/concerns/waitable_worker_spec.rb92
5 files changed, 145 insertions, 114 deletions
diff --git a/app/workers/authorized_projects_worker.rb b/app/workers/authorized_projects_worker.rb
index 09559e3b696..d7e24491516 100644
--- a/app/workers/authorized_projects_worker.rb
+++ b/app/workers/authorized_projects_worker.rb
@@ -1,42 +1,10 @@
class AuthorizedProjectsWorker
include ApplicationWorker
+ prepend WaitableWorker
- # Schedules multiple jobs and waits for them to be completed.
- def self.bulk_perform_and_wait(args_list)
- # Short-circuit: it's more efficient to do small numbers of jobs inline
- return bulk_perform_inline(args_list) if args_list.size <= 3
-
- waiter = Gitlab::JobWaiter.new(args_list.size)
-
- # Point all the bulk jobs at the same JobWaiter. Converts, [[1], [2], [3]]
- # into [[1, "key"], [2, "key"], [3, "key"]]
- waiting_args_list = args_list.map { |args| [*args, waiter.key] }
- bulk_perform_async(waiting_args_list)
-
- waiter.wait
- end
-
- # Performs multiple jobs directly. Failed jobs will be put into sidekiq so
- # they can benefit from retries
- def self.bulk_perform_inline(args_list)
- failed = []
-
- args_list.each do |args|
- begin
- new.perform(*args)
- rescue
- failed << args
- end
- end
-
- bulk_perform_async(failed) if failed.present?
- end
-
- def perform(user_id, notify_key = nil)
+ def perform(user_id)
user = User.find_by(id: user_id)
user&.refresh_authorized_projects
- ensure
- Gitlab::JobWaiter.notify(notify_key, jid) if notify_key
end
end
diff --git a/app/workers/concerns/waitable_worker.rb b/app/workers/concerns/waitable_worker.rb
new file mode 100644
index 00000000000..7d2266aea3c
--- /dev/null
+++ b/app/workers/concerns/waitable_worker.rb
@@ -0,0 +1,44 @@
+module WaitableWorker
+ extend ActiveSupport::Concern
+
+ module ClassMethods
+ # Schedules multiple jobs and waits for them to be completed.
+ def bulk_perform_and_wait(args_list)
+ # Short-circuit: it's more efficient to do small numbers of jobs inline
+ return bulk_perform_inline(args_list) if args_list.size <= 3
+
+ waiter = Gitlab::JobWaiter.new(args_list.size)
+
+ # Point all the bulk jobs at the same JobWaiter. Converts, [[1], [2], [3]]
+ # into [[1, "key"], [2, "key"], [3, "key"]]
+ waiting_args_list = args_list.map { |args| [*args, waiter.key] }
+ bulk_perform_async(waiting_args_list)
+
+ waiter.wait
+ end
+
+ # Performs multiple jobs directly. Failed jobs will be put into sidekiq so
+ # they can benefit from retries
+ def bulk_perform_inline(args_list)
+ failed = []
+
+ args_list.each do |args|
+ begin
+ new.perform(*args)
+ rescue
+ failed << args
+ end
+ end
+
+ bulk_perform_async(failed) if failed.present?
+ end
+ end
+
+ def perform(*args)
+ notify_key = args.pop if Gitlab::JobWaiter.key?(args.last)
+
+ super(*args)
+ ensure
+ Gitlab::JobWaiter.notify(notify_key, jid) if notify_key
+ end
+end
diff --git a/lib/gitlab/job_waiter.rb b/lib/gitlab/job_waiter.rb
index f654508c391..f7a8eae0be4 100644
--- a/lib/gitlab/job_waiter.rb
+++ b/lib/gitlab/job_waiter.rb
@@ -15,16 +15,22 @@ module Gitlab
# push to that array when done. Once the waiter has popped `count` items, it
# knows all the jobs are done.
class JobWaiter
+ KEY_PREFIX = "gitlab:job_waiter".freeze
+
def self.notify(key, jid)
Gitlab::Redis::SharedState.with { |redis| redis.lpush(key, jid) }
end
+ def self.key?(key)
+ key.is_a?(String) && key =~ /\A#{KEY_PREFIX}:\h{8}-\h{4}-\h{4}-\h{4}-\h{12}\z/
+ end
+
attr_reader :key, :finished
attr_accessor :jobs_remaining
# jobs_remaining - the number of jobs left to wait for
# key - The key of this waiter.
- def initialize(jobs_remaining = 0, key = "gitlab:job_waiter:#{SecureRandom.uuid}")
+ def initialize(jobs_remaining = 0, key = "#{KEY_PREFIX}:#{SecureRandom.uuid}")
@key = key
@jobs_remaining = jobs_remaining
@finished = []
diff --git a/spec/workers/authorized_projects_worker_spec.rb b/spec/workers/authorized_projects_worker_spec.rb
index 0d6eb536c33..d095138f6b7 100644
--- a/spec/workers/authorized_projects_worker_spec.rb
+++ b/spec/workers/authorized_projects_worker_spec.rb
@@ -1,79 +1,6 @@
require 'spec_helper'
describe AuthorizedProjectsWorker do
- let(:project) { create(:project) }
-
- def build_args_list(*ids, multiply: 1)
- args_list = ids.map { |id| [id] }
- args_list * multiply
- end
-
- describe '.bulk_perform_and_wait' do
- it 'schedules the ids and waits for the jobs to complete' do
- args_list = build_args_list(project.owner.id)
-
- project.owner.project_authorizations.delete_all
- described_class.bulk_perform_and_wait(args_list)
-
- expect(project.owner.project_authorizations.count).to eq(1)
- end
-
- it 'inlines workloads <= 3 jobs' do
- args_list = build_args_list(project.owner.id, multiply: 3)
- expect(described_class).to receive(:bulk_perform_inline).with(args_list)
-
- described_class.bulk_perform_and_wait(args_list)
- end
-
- it 'runs > 3 jobs using sidekiq' do
- project.owner.project_authorizations.delete_all
-
- expect(described_class).to receive(:bulk_perform_async).and_call_original
-
- args_list = build_args_list(project.owner.id, multiply: 4)
- described_class.bulk_perform_and_wait(args_list)
-
- expect(project.owner.project_authorizations.count).to eq(1)
- end
- end
-
- describe '.bulk_perform_inline' do
- it 'refreshes the authorizations inline' do
- project.owner.project_authorizations.delete_all
-
- expect_any_instance_of(described_class).to receive(:perform).and_call_original
-
- described_class.bulk_perform_inline(build_args_list(project.owner.id))
-
- expect(project.owner.project_authorizations.count).to eq(1)
- end
-
- it 'enqueues jobs if an error is raised' do
- invalid_id = -1
- args_list = build_args_list(project.owner.id, invalid_id)
-
- allow_any_instance_of(described_class).to receive(:perform).with(project.owner.id)
- allow_any_instance_of(described_class).to receive(:perform).with(invalid_id).and_raise(ArgumentError)
- expect(described_class).to receive(:bulk_perform_async).with(build_args_list(invalid_id))
-
- described_class.bulk_perform_inline(args_list)
- end
- end
-
- describe '.bulk_perform_async' do
- it "uses it's respective sidekiq queue" do
- args_list = build_args_list(project.owner.id)
- push_bulk_args = {
- 'class' => described_class,
- 'args' => args_list
- }
-
- expect(Sidekiq::Client).to receive(:push_bulk).with(push_bulk_args).once
-
- described_class.bulk_perform_async(args_list)
- end
- end
-
describe '#perform' do
let(:user) { create(:user) }
@@ -85,12 +12,6 @@ describe AuthorizedProjectsWorker do
job.perform(user.id)
end
- it 'notifies the JobWaiter when done if the key is provided' do
- expect(Gitlab::JobWaiter).to receive(:notify).with('notify-key', job.jid)
-
- job.perform(user.id, 'notify-key')
- end
-
context "when the user is not found" do
it "does nothing" do
expect_any_instance_of(User).not_to receive(:refresh_authorized_projects)
diff --git a/spec/workers/concerns/waitable_worker_spec.rb b/spec/workers/concerns/waitable_worker_spec.rb
new file mode 100644
index 00000000000..4af0de86ac9
--- /dev/null
+++ b/spec/workers/concerns/waitable_worker_spec.rb
@@ -0,0 +1,92 @@
+require 'spec_helper'
+
+describe WaitableWorker do
+ let(:worker) do
+ Class.new do
+ def self.name
+ 'Gitlab::Foo::Bar::DummyWorker'
+ end
+
+ class << self
+ cattr_accessor(:counter) { 0 }
+ end
+
+ include ApplicationWorker
+ prepend WaitableWorker
+
+ def perform(i = 0)
+ self.class.counter += i
+ end
+ end
+ end
+
+ subject(:job) { worker.new }
+
+ describe '.bulk_perform_and_wait' do
+ it 'schedules the jobs and waits for them to complete' do
+ worker.bulk_perform_and_wait([[1], [2]])
+
+ expect(worker.counter).to eq(3)
+ end
+
+ it 'inlines workloads <= 3 jobs' do
+ args_list = [[1], [2], [3]]
+ expect(worker).to receive(:bulk_perform_inline).with(args_list).and_call_original
+
+ worker.bulk_perform_and_wait(args_list)
+
+ expect(worker.counter).to eq(6)
+ end
+
+ it 'runs > 3 jobs using sidekiq' do
+ expect(worker).to receive(:bulk_perform_async)
+
+ worker.bulk_perform_and_wait([[1], [2], [3], [4]])
+ end
+ end
+
+ describe '.bulk_perform_inline' do
+ it 'runs the jobs inline' do
+ expect(worker).not_to receive(:bulk_perform_async)
+
+ worker.bulk_perform_inline([[1], [2]])
+
+ expect(worker.counter).to eq(3)
+ end
+
+ it 'enqueues jobs if an error is raised' do
+ expect(worker).to receive(:bulk_perform_async).with([['foo']])
+
+ worker.bulk_perform_inline([[1], ['foo']])
+ end
+ end
+
+ describe '#perform' do
+ shared_examples 'perform' do
+ it 'notifies the JobWaiter when done if the key is provided' do
+ key = Gitlab::JobWaiter.new.key
+ expect(Gitlab::JobWaiter).to receive(:notify).with(key, job.jid)
+
+ job.perform(*args, key)
+ end
+
+ it 'does not notify the JobWaiter when done if no key is provided' do
+ expect(Gitlab::JobWaiter).not_to receive(:notify)
+
+ job.perform(*args)
+ end
+ end
+
+ context 'when the worker takes arguments' do
+ let(:args) { [1] }
+
+ it_behaves_like 'perform'
+ end
+
+ context 'when the worker takes no arguments' do
+ let(:args) { [] }
+
+ it_behaves_like 'perform'
+ end
+ end
+end