summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon Knox <psimyn@gmail.com>2017-08-22 21:11:28 +1000
committerSimon Knox <psimyn@gmail.com>2017-08-22 21:11:28 +1000
commit2fd66a75fe1045ab531c767fd882908bb54e576e (patch)
treec03c73a0099ac48828a76db267ec23220097e3fa
parent085e0e4ca0ca2789cb9e98d6ee79193684b94c07 (diff)
parent78a0d27e98cea4ed1b59377edc93588127b297fe (diff)
downloadgitlab-ce-2fd66a75fe1045ab531c767fd882908bb54e576e.tar.gz
Merge branch 'master' of gitlab.com:gitlab-org/gitlab-ce
-rw-r--r--app/workers/authorized_projects_worker.rb13
-rw-r--r--lib/gitlab/job_waiter.rb57
-rw-r--r--spec/lib/gitlab/job_waiter_spec.rb41
-rw-r--r--spec/workers/authorized_projects_worker_spec.rb16
4 files changed, 94 insertions, 33 deletions
diff --git a/app/workers/authorized_projects_worker.rb b/app/workers/authorized_projects_worker.rb
index 13207a8bc71..be4c77503bb 100644
--- a/app/workers/authorized_projects_worker.rb
+++ b/app/workers/authorized_projects_worker.rb
@@ -4,18 +4,25 @@ class AuthorizedProjectsWorker
# Schedules multiple jobs and waits for them to be completed.
def self.bulk_perform_and_wait(args_list)
- job_ids = bulk_perform_async(args_list)
+ waiter = Gitlab::JobWaiter.new(args_list.size)
- Gitlab::JobWaiter.new(job_ids).wait
+ # Point all the bulk jobs at the same JobWaiter. Converts, [[1], [2], [3]]
+ # into [[1, "key"], [2, "key"], [3, "key"]]
+ waiting_args_list = args_list.map { |args| args << waiter.key }
+ bulk_perform_async(waiting_args_list)
+
+ waiter.wait
end
def self.bulk_perform_async(args_list)
Sidekiq::Client.push_bulk('class' => self, 'queue' => sidekiq_options['queue'], 'args' => args_list)
end
- def perform(user_id)
+ def perform(user_id, notify_key = nil)
user = User.find_by(id: user_id)
user&.refresh_authorized_projects
+ ensure
+ Gitlab::JobWaiter.notify(notify_key, jid) if notify_key
end
end
diff --git a/lib/gitlab/job_waiter.rb b/lib/gitlab/job_waiter.rb
index 208f0e1bbea..4d6bbda15f3 100644
--- a/lib/gitlab/job_waiter.rb
+++ b/lib/gitlab/job_waiter.rb
@@ -1,12 +1,31 @@
module Gitlab
# JobWaiter can be used to wait for a number of Sidekiq jobs to complete.
+ #
+ # Its use requires the cooperation of the sidekiq jobs themselves. Set up the
+ # waiter, then start the jobs, passing them its `key`. Their `perform` methods
+ # should look like:
+ #
+ # def perform(args, notify_key)
+ # # do work
+ # ensure
+ # ::Gitlab::JobWaiter.notify(notify_key, jid)
+ # end
+ #
+ # The JobWaiter blocks popping items from a Redis array. All the sidekiq jobs
+ # push to that array when done. Once the waiter has popped `count` items, it
+ # knows all the jobs are done.
class JobWaiter
- # The sleep interval between checking keys, in seconds.
- INTERVAL = 0.1
+ def self.notify(key, jid)
+ Gitlab::Redis::SharedState.with { |redis| redis.lpush(key, jid) }
+ end
+
+ attr_reader :key, :jobs_remaining, :finished
- # jobs - The job IDs to wait for.
- def initialize(jobs)
- @jobs = jobs
+ # jobs_remaining - the number of jobs left to wait for
+ def initialize(jobs_remaining)
+ @key = "gitlab:job_waiter:#{SecureRandom.uuid}"
+ @jobs_remaining = jobs_remaining
+ @finished = []
end
# Waits for all the jobs to be completed.
@@ -15,13 +34,33 @@ module Gitlab
# ensures we don't indefinitely block a caller in case a job takes
# long to process, or is never processed.
def wait(timeout = 10)
- start = Time.current
+ deadline = Time.now.utc + timeout
+
+ Gitlab::Redis::SharedState.with do |redis|
+ # Fallback key expiry: allow a long grace period to reduce the chance of
+ # a job pushing to an expired key and recreating it
+ redis.expire(key, [timeout * 2, 10.minutes.to_i].max)
+
+ while jobs_remaining > 0
+ # Redis will not take fractional seconds. Prefer waiting too long over
+ # not waiting long enough
+ seconds_left = (deadline - Time.now.utc).ceil
- while (Time.current - start) <= timeout
- break if SidekiqStatus.all_completed?(@jobs)
+ # Redis interprets 0 as "wait forever", so skip the final `blpop` call
+ break if seconds_left <= 0
- sleep(INTERVAL) # to not overload Redis too much.
+ list, jid = redis.blpop(key, timeout: seconds_left)
+ break unless list && jid # timed out
+
+ @finished << jid
+ @jobs_remaining -= 1
+ end
+
+ # All jobs have finished, so expire the key immediately
+ redis.expire(key, 0) if jobs_remaining == 0
end
+
+ finished
end
end
end
diff --git a/spec/lib/gitlab/job_waiter_spec.rb b/spec/lib/gitlab/job_waiter_spec.rb
index 6186cec2689..b0b4fdc09bc 100644
--- a/spec/lib/gitlab/job_waiter_spec.rb
+++ b/spec/lib/gitlab/job_waiter_spec.rb
@@ -1,30 +1,39 @@
require 'spec_helper'
describe Gitlab::JobWaiter do
- describe '#wait' do
- let(:waiter) { described_class.new(%w(a)) }
- it 'returns when all jobs have been completed' do
- expect(Gitlab::SidekiqStatus).to receive(:all_completed?).with(%w(a))
- .and_return(true)
+ describe '.notify' do
+ it 'pushes the jid to the named queue' do
+ key = 'gitlab:job_waiter:foo'
+ jid = 1
- expect(waiter).not_to receive(:sleep)
+ redis = double('redis')
+ expect(Gitlab::Redis::SharedState).to receive(:with).and_yield(redis)
+ expect(redis).to receive(:lpush).with(key, jid)
- waiter.wait
+ described_class.notify(key, jid)
end
+ end
+
+ describe '#wait' do
+ let(:waiter) { described_class.new(2) }
- it 'sleeps between checking the job statuses' do
- expect(Gitlab::SidekiqStatus).to receive(:all_completed?)
- .with(%w(a))
- .and_return(false, true)
+ it 'returns when all jobs have been completed' do
+ described_class.notify(waiter.key, 'a')
+ described_class.notify(waiter.key, 'b')
- expect(waiter).to receive(:sleep).with(described_class::INTERVAL)
+ result = nil
+ expect { Timeout.timeout(1) { result = waiter.wait(2) } }.not_to raise_error
- waiter.wait
+ expect(result).to contain_exactly('a', 'b')
end
- it 'returns when timing out' do
- expect(waiter).not_to receive(:sleep)
- waiter.wait(0)
+ it 'times out if not all jobs complete' do
+ described_class.notify(waiter.key, 'a')
+
+ result = nil
+ expect { Timeout.timeout(2) { result = waiter.wait(1) } }.not_to raise_error
+
+ expect(result).to contain_exactly('a')
end
end
end
diff --git a/spec/workers/authorized_projects_worker_spec.rb b/spec/workers/authorized_projects_worker_spec.rb
index 03b9b99e263..f8385ae7c72 100644
--- a/spec/workers/authorized_projects_worker_spec.rb
+++ b/spec/workers/authorized_projects_worker_spec.rb
@@ -29,21 +29,27 @@ describe AuthorizedProjectsWorker do
end
describe '#perform' do
- subject { described_class.new }
+ let(:user) { create(:user) }
- it "refreshes user's authorized projects" do
- user = create(:user)
+ subject(:job) { described_class.new }
+ it "refreshes user's authorized projects" do
expect_any_instance_of(User).to receive(:refresh_authorized_projects)
- subject.perform(user.id)
+ job.perform(user.id)
+ end
+
+ it 'notifies the JobWaiter when done if the key is provided' do
+ expect(Gitlab::JobWaiter).to receive(:notify).with('notify-key', job.jid)
+
+ job.perform(user.id, 'notify-key')
end
context "when the user is not found" do
it "does nothing" do
expect_any_instance_of(User).not_to receive(:refresh_authorized_projects)
- subject.perform(-1)
+ job.perform(-1)
end
end
end