summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDouwe Maan <douwe@selenight.nl>2017-11-29 16:30:17 +0100
committerDouwe Maan <douwe@selenight.nl>2017-12-05 11:59:39 +0100
commit1e6ca3c41ead23c5e433460c8c807ea73d9ec0ef (patch)
treeed6a5da0def848adc1a15f80e69d9c55651895a4
parenta5c3f1c8ff7da20183b172b2b0693a6010c9e86d (diff)
downloadgitlab-ce-dm-application-worker.tar.gz
Consistently schedule Sidekiq jobsdm-application-worker
-rw-r--r--app/models/group.rb1
-rw-r--r--app/models/key.rb1
-rw-r--r--app/models/member.rb1
-rw-r--r--app/models/service.rb2
-rw-r--r--app/models/user.rb2
-rw-r--r--app/services/system_hooks_service.rb6
-rw-r--r--app/services/web_hook_service.rb2
-rw-r--r--app/workers/authorized_projects_worker.rb5
-rw-r--r--app/workers/background_migration_worker.rb28
-rw-r--r--app/workers/concerns/application_worker.rb15
-rw-r--r--app/workers/expire_build_artifacts_worker.rb2
-rw-r--r--app/workers/namespaceless_project_destroy_worker.rb4
-rw-r--r--config/initializers/forbid_sidekiq_in_transactions.rb19
-rw-r--r--db/post_migrate/20170627101016_schedule_event_migrations.rb4
-rw-r--r--db/post_migrate/20171005130944_schedule_create_gpg_key_subkeys_from_gpg_keys.rb2
-rw-r--r--doc/development/background_migrations.md10
-rw-r--r--lib/after_commit_queue.rb26
-rw-r--r--lib/gitlab/database/migration_helpers.rb4
-rw-r--r--spec/lib/gitlab/database/migration_helpers_spec.rb8
-rw-r--r--spec/services/web_hook_service_spec.rb2
-rw-r--r--spec/workers/authorized_projects_worker_spec.rb1
-rw-r--r--spec/workers/background_migration_worker_spec.rb31
-rw-r--r--spec/workers/concerns/application_worker_spec.rb31
23 files changed, 107 insertions, 100 deletions
diff --git a/app/models/group.rb b/app/models/group.rb
index 27287f079a4..505e943e464 100644
--- a/app/models/group.rb
+++ b/app/models/group.rb
@@ -2,6 +2,7 @@ require 'carrierwave/orm/activerecord'
class Group < Namespace
include Gitlab::ConfigHelper
+ include AfterCommitQueue
include AccessRequestable
include Avatarable
include Referable
diff --git a/app/models/key.rb b/app/models/key.rb
index 815fd1de909..a3f8a5d6dc7 100644
--- a/app/models/key.rb
+++ b/app/models/key.rb
@@ -2,6 +2,7 @@ require 'digest/md5'
class Key < ActiveRecord::Base
include Gitlab::CurrentSettings
+ include AfterCommitQueue
include Sortable
belongs_to :user
diff --git a/app/models/member.rb b/app/models/member.rb
index cbbd58f2eaf..2fe5fda985f 100644
--- a/app/models/member.rb
+++ b/app/models/member.rb
@@ -1,4 +1,5 @@
class Member < ActiveRecord::Base
+ include AfterCommitQueue
include Sortable
include Importable
include Expirable
diff --git a/app/models/service.rb b/app/models/service.rb
index fdd2605e3e3..3c4f1885dd0 100644
--- a/app/models/service.rb
+++ b/app/models/service.rb
@@ -211,7 +211,7 @@ class Service < ActiveRecord::Base
def async_execute(data)
return unless supported_events.include?(data[:object_kind])
- Sidekiq::Client.enqueue(ProjectServiceWorker, id, data)
+ ProjectServiceWorker.perform_async(id, data)
end
def issue_tracker?
diff --git a/app/models/user.rb b/app/models/user.rb
index 76fd395be9a..38ee4ed50c1 100644
--- a/app/models/user.rb
+++ b/app/models/user.rb
@@ -7,6 +7,7 @@ class User < ActiveRecord::Base
include Gitlab::ConfigHelper
include Gitlab::CurrentSettings
include Gitlab::SQL::Pattern
+ include AfterCommitQueue
include Avatarable
include Referable
include Sortable
@@ -903,6 +904,7 @@ class User < ActiveRecord::Base
def post_destroy_hook
log_info("User \"#{name}\" (#{email}) was removed")
+
system_hook_service.execute_hooks_for(self, :destroy)
end
diff --git a/app/services/system_hooks_service.rb b/app/services/system_hooks_service.rb
index 911cc919bb8..690918b4a00 100644
--- a/app/services/system_hooks_service.rb
+++ b/app/services/system_hooks_service.rb
@@ -1,6 +1,10 @@
class SystemHooksService
def execute_hooks_for(model, event)
- execute_hooks(build_event_data(model, event))
+ data = build_event_data(model, event)
+
+ model.run_after_commit_or_now do
+ SystemHooksService.new.execute_hooks(data)
+ end
end
def execute_hooks(data, hooks_scope = :all)
diff --git a/app/services/web_hook_service.rb b/app/services/web_hook_service.rb
index cd99e0b90f9..6ebc7c89500 100644
--- a/app/services/web_hook_service.rb
+++ b/app/services/web_hook_service.rb
@@ -63,7 +63,7 @@ class WebHookService
end
def async_execute
- Sidekiq::Client.enqueue(WebHookWorker, hook.id, data, hook_name)
+ WebHookWorker.perform_async(hook.id, data, hook_name)
end
private
diff --git a/app/workers/authorized_projects_worker.rb b/app/workers/authorized_projects_worker.rb
index d4f334ec3b8..09559e3b696 100644
--- a/app/workers/authorized_projects_worker.rb
+++ b/app/workers/authorized_projects_worker.rb
@@ -16,11 +16,6 @@ class AuthorizedProjectsWorker
waiter.wait
end
- # Schedules multiple jobs to run in sidekiq without waiting for completion
- def self.bulk_perform_async(args_list)
- Sidekiq::Client.push_bulk('class' => self, 'queue' => sidekiq_options['queue'], 'args' => args_list)
- end
-
# Performs multiple jobs directly. Failed jobs will be put into sidekiq so
# they can benefit from retries
def self.bulk_perform_inline(args_list)
diff --git a/app/workers/background_migration_worker.rb b/app/workers/background_migration_worker.rb
index 65791c4eaba..aeb3bc019b9 100644
--- a/app/workers/background_migration_worker.rb
+++ b/app/workers/background_migration_worker.rb
@@ -1,34 +1,6 @@
class BackgroundMigrationWorker
include ApplicationWorker
- # Enqueues a number of jobs in bulk.
- #
- # The `jobs` argument should be an Array of Arrays, each sub-array must be in
- # the form:
- #
- # [migration-class, [arg1, arg2, ...]]
- def self.perform_bulk(jobs)
- Sidekiq::Client.push_bulk('class' => self,
- 'queue' => sidekiq_options['queue'],
- 'args' => jobs)
- end
-
- # Schedules multiple jobs in bulk, with a delay.
- #
- def self.perform_bulk_in(delay, jobs)
- now = Time.now.to_i
- schedule = now + delay.to_i
-
- if schedule <= now
- raise ArgumentError, 'The schedule time must be in the future!'
- end
-
- Sidekiq::Client.push_bulk('class' => self,
- 'queue' => sidekiq_options['queue'],
- 'args' => jobs,
- 'at' => schedule)
- end
-
# Performs the background migration.
#
# See Gitlab::BackgroundMigration.perform for more information.
diff --git a/app/workers/concerns/application_worker.rb b/app/workers/concerns/application_worker.rb
index bf1ecaa0c6d..9c3bdabc49e 100644
--- a/app/workers/concerns/application_worker.rb
+++ b/app/workers/concerns/application_worker.rb
@@ -21,5 +21,20 @@ module ApplicationWorker
def queue
get_sidekiq_options['queue'].to_s
end
+
+ def bulk_perform_async(args_list)
+ Sidekiq::Client.push_bulk('class' => self, 'args' => args_list)
+ end
+
+ def bulk_perform_in(delay, args_list)
+ now = Time.now.to_i
+ schedule = now + delay.to_i
+
+ if schedule <= now
+ raise ArgumentError, 'The schedule time must be in the future!'
+ end
+
+ Sidekiq::Client.push_bulk('class' => self, 'args' => args_list, 'at' => schedule)
+ end
end
end
diff --git a/app/workers/expire_build_artifacts_worker.rb b/app/workers/expire_build_artifacts_worker.rb
index 73ab4211080..87e5dca01fd 100644
--- a/app/workers/expire_build_artifacts_worker.rb
+++ b/app/workers/expire_build_artifacts_worker.rb
@@ -8,6 +8,6 @@ class ExpireBuildArtifactsWorker
build_ids = Ci::Build.with_expired_artifacts.pluck(:id)
build_ids = build_ids.map { |build_id| [build_id] }
- Sidekiq::Client.push_bulk('class' => ExpireBuildInstanceArtifactsWorker, 'args' => build_ids )
+ ExpireBuildInstanceArtifactsWorker.bulk_perform_async(build_ids)
end
end
diff --git a/app/workers/namespaceless_project_destroy_worker.rb b/app/workers/namespaceless_project_destroy_worker.rb
index 13d750e5876..adb25c2a170 100644
--- a/app/workers/namespaceless_project_destroy_worker.rb
+++ b/app/workers/namespaceless_project_destroy_worker.rb
@@ -8,10 +8,6 @@ class NamespacelessProjectDestroyWorker
include ApplicationWorker
include ExceptionBacktrace
- def self.bulk_perform_async(args_list)
- Sidekiq::Client.push_bulk('class' => self, 'queue' => sidekiq_options['queue'], 'args' => args_list)
- end
-
def perform(project_id)
begin
project = Project.unscoped.find(project_id)
diff --git a/config/initializers/forbid_sidekiq_in_transactions.rb b/config/initializers/forbid_sidekiq_in_transactions.rb
index a78711fe599..bedd57ede04 100644
--- a/config/initializers/forbid_sidekiq_in_transactions.rb
+++ b/config/initializers/forbid_sidekiq_in_transactions.rb
@@ -13,20 +13,19 @@ module Sidekiq
module ClassMethods
module NoSchedulingFromTransactions
- NESTING = ::Rails.env.test? ? 1 : 0
-
%i(perform_async perform_at perform_in).each do |name|
define_method(name) do |*args|
- return super(*args) if Sidekiq::Worker.skip_transaction_check
- return super(*args) unless ActiveRecord::Base.connection.open_transactions > NESTING
+ if !Sidekiq::Worker.skip_transaction_check && AfterCommitQueue.inside_transaction?
+ raise <<-MSG.strip_heredoc
+ `#{self}.#{name}` cannot be called inside a transaction as this can lead to
+ race conditions when the worker runs before the transaction is committed and
+ tries to access a model that has not been saved yet.
- raise <<-MSG.strip_heredoc
- `#{self}.#{name}` cannot be called inside a transaction as this can lead to
- race conditions when the worker runs before the transaction is committed and
- tries to access a model that has not been saved yet.
+ Use an `after_commit` hook, or include `AfterCommitQueue` and use a `run_after_commit` block instead.
+ MSG
+ end
- Use an `after_commit` hook, or include `AfterCommitQueue` and use a `run_after_commit` block instead.
- MSG
+ super(*args)
end
end
end
diff --git a/db/post_migrate/20170627101016_schedule_event_migrations.rb b/db/post_migrate/20170627101016_schedule_event_migrations.rb
index 1f34375ff0d..1e020d05f78 100644
--- a/db/post_migrate/20170627101016_schedule_event_migrations.rb
+++ b/db/post_migrate/20170627101016_schedule_event_migrations.rb
@@ -25,14 +25,14 @@ class ScheduleEventMigrations < ActiveRecord::Migration
# We push multiple jobs at a time to reduce the time spent in
# Sidekiq/Redis operations. We're using this buffer based approach so we
# don't need to run additional queries for every range.
- BackgroundMigrationWorker.perform_bulk(jobs)
+ BackgroundMigrationWorker.bulk_perform_async(jobs)
jobs.clear
end
jobs << ['MigrateEventsToPushEventPayloads', [min, max]]
end
- BackgroundMigrationWorker.perform_bulk(jobs) unless jobs.empty?
+ BackgroundMigrationWorker.bulk_perform_async(jobs) unless jobs.empty?
end
def down
diff --git a/db/post_migrate/20171005130944_schedule_create_gpg_key_subkeys_from_gpg_keys.rb b/db/post_migrate/20171005130944_schedule_create_gpg_key_subkeys_from_gpg_keys.rb
index 01d56fbd490..467c584c2e0 100644
--- a/db/post_migrate/20171005130944_schedule_create_gpg_key_subkeys_from_gpg_keys.rb
+++ b/db/post_migrate/20171005130944_schedule_create_gpg_key_subkeys_from_gpg_keys.rb
@@ -19,7 +19,7 @@ class ScheduleCreateGpgKeySubkeysFromGpgKeys < ActiveRecord::Migration
[MIGRATION, [id]]
end
- BackgroundMigrationWorker.perform_bulk(jobs)
+ BackgroundMigrationWorker.bulk_perform_async(jobs)
end
end
diff --git a/doc/development/background_migrations.md b/doc/development/background_migrations.md
index 5452b0e7a2f..fd2b9d0e908 100644
--- a/doc/development/background_migrations.md
+++ b/doc/development/background_migrations.md
@@ -68,10 +68,10 @@ BackgroundMigrationWorker.perform_async('BackgroundMigrationClassName', [arg1, a
```
Usually it's better to enqueue jobs in bulk, for this you can use
-`BackgroundMigrationWorker.perform_bulk`:
+`BackgroundMigrationWorker.bulk_perform_async`:
```ruby
-BackgroundMigrationWorker.perform_bulk(
+BackgroundMigrationWorker.bulk_perform_async(
[['BackgroundMigrationClassName', [1]],
['BackgroundMigrationClassName', [2]]]
)
@@ -85,13 +85,13 @@ updates. Removals in turn can be handled by simply defining foreign keys with
cascading deletes.
If you would like to schedule jobs in bulk with a delay, you can use
-`BackgroundMigrationWorker.perform_bulk_in`:
+`BackgroundMigrationWorker.bulk_perform_in`:
```ruby
jobs = [['BackgroundMigrationClassName', [1]],
['BackgroundMigrationClassName', [2]]]
-BackgroundMigrationWorker.perform_bulk_in(5.minutes, jobs)
+BackgroundMigrationWorker.bulk_perform_in(5.minutes, jobs)
```
## Cleaning Up
@@ -201,7 +201,7 @@ class ScheduleExtractServicesUrl < ActiveRecord::Migration
['ExtractServicesUrl', [id]]
end
- BackgroundMigrationWorker.perform_bulk(jobs)
+ BackgroundMigrationWorker.bulk_perform_async(jobs)
end
end
diff --git a/lib/after_commit_queue.rb b/lib/after_commit_queue.rb
index 4750a2c373a..db63c5038ae 100644
--- a/lib/after_commit_queue.rb
+++ b/lib/after_commit_queue.rb
@@ -6,12 +6,34 @@ module AfterCommitQueue
after_rollback :_clear_after_commit_queue
end
- def run_after_commit(method = nil, &block)
- _after_commit_queue << proc { self.send(method) } if method # rubocop:disable GitlabSecurity/PublicSend
+ def run_after_commit(&block)
_after_commit_queue << block if block
+
+ true
+ end
+
+ def run_after_commit_or_now(&block)
+ if AfterCommitQueue.inside_transaction?
+ run_after_commit(&block)
+ else
+ instance_eval(&block)
+ end
+
true
end
+ def self.open_transactions_baseline
+ if ::Rails.env.test?
+ return DatabaseCleaner.connections.count { |conn| conn.strategy.is_a?(DatabaseCleaner::ActiveRecord::Transaction) }
+ end
+
+ 0
+ end
+
+ def self.inside_transaction?
+ ActiveRecord::Base.connection.open_transactions > open_transactions_baseline
+ end
+
protected
def _run_after_commit_queue
diff --git a/lib/gitlab/database/migration_helpers.rb b/lib/gitlab/database/migration_helpers.rb
index c276c3566b4..3f65bc912de 100644
--- a/lib/gitlab/database/migration_helpers.rb
+++ b/lib/gitlab/database/migration_helpers.rb
@@ -703,14 +703,14 @@ into similar problems in the future (e.g. when new tables are created).
# We push multiple jobs at a time to reduce the time spent in
# Sidekiq/Redis operations. We're using this buffer based approach so we
# don't need to run additional queries for every range.
- BackgroundMigrationWorker.perform_bulk(jobs)
+ BackgroundMigrationWorker.bulk_perform_async(jobs)
jobs.clear
end
jobs << [job_class_name, [start_id, end_id]]
end
- BackgroundMigrationWorker.perform_bulk(jobs) unless jobs.empty?
+ BackgroundMigrationWorker.bulk_perform_async(jobs) unless jobs.empty?
end
# Queues background migration jobs for an entire table, batched by ID range.
diff --git a/spec/lib/gitlab/database/migration_helpers_spec.rb b/spec/lib/gitlab/database/migration_helpers_spec.rb
index 3c8350b3aad..664ba0f7234 100644
--- a/spec/lib/gitlab/database/migration_helpers_spec.rb
+++ b/spec/lib/gitlab/database/migration_helpers_spec.rb
@@ -942,8 +942,8 @@ describe Gitlab::Database::MigrationHelpers do
end
it 'queues jobs in groups of buffer size 1' do
- expect(BackgroundMigrationWorker).to receive(:perform_bulk).with([['FooJob', [id1, id2]]])
- expect(BackgroundMigrationWorker).to receive(:perform_bulk).with([['FooJob', [id3, id3]]])
+ expect(BackgroundMigrationWorker).to receive(:bulk_perform_async).with([['FooJob', [id1, id2]]])
+ expect(BackgroundMigrationWorker).to receive(:bulk_perform_async).with([['FooJob', [id3, id3]]])
model.bulk_queue_background_migration_jobs_by_range(User, 'FooJob', batch_size: 2)
end
@@ -960,8 +960,8 @@ describe Gitlab::Database::MigrationHelpers do
end
it 'queues jobs in bulk all at once (big buffer size)' do
- expect(BackgroundMigrationWorker).to receive(:perform_bulk).with([['FooJob', [id1, id2]],
- ['FooJob', [id3, id3]]])
+ expect(BackgroundMigrationWorker).to receive(:bulk_perform_async).with([['FooJob', [id1, id2]],
+ ['FooJob', [id3, id3]]])
model.bulk_queue_background_migration_jobs_by_range(User, 'FooJob', batch_size: 2)
end
diff --git a/spec/services/web_hook_service_spec.rb b/spec/services/web_hook_service_spec.rb
index a669429ce3e..21910e69d2e 100644
--- a/spec/services/web_hook_service_spec.rb
+++ b/spec/services/web_hook_service_spec.rb
@@ -146,7 +146,7 @@ describe WebHookService do
let(:system_hook) { create(:system_hook) }
it 'enqueue WebHookWorker' do
- expect(Sidekiq::Client).to receive(:enqueue).with(WebHookWorker, project_hook.id, data, 'push_hooks')
+ expect(WebHookWorker).to receive(:perform_async).with(project_hook.id, data, 'push_hooks')
described_class.new(project_hook, data, 'push_hooks').async_execute
end
diff --git a/spec/workers/authorized_projects_worker_spec.rb b/spec/workers/authorized_projects_worker_spec.rb
index 90ed1309d4a..0d6eb536c33 100644
--- a/spec/workers/authorized_projects_worker_spec.rb
+++ b/spec/workers/authorized_projects_worker_spec.rb
@@ -65,7 +65,6 @@ describe AuthorizedProjectsWorker do
args_list = build_args_list(project.owner.id)
push_bulk_args = {
'class' => described_class,
- 'queue' => described_class.sidekiq_options['queue'],
'args' => args_list
}
diff --git a/spec/workers/background_migration_worker_spec.rb b/spec/workers/background_migration_worker_spec.rb
index 4f6e3474634..1c54cf55fa0 100644
--- a/spec/workers/background_migration_worker_spec.rb
+++ b/spec/workers/background_migration_worker_spec.rb
@@ -10,35 +10,4 @@ describe BackgroundMigrationWorker, :sidekiq do
described_class.new.perform('Foo', [10, 20])
end
end
-
- describe '.perform_bulk' do
- it 'enqueues background migrations in bulk' do
- Sidekiq::Testing.fake! do
- described_class.perform_bulk([['Foo', [1]], ['Foo', [2]]])
-
- expect(described_class.jobs.count).to eq 2
- expect(described_class.jobs).to all(include('enqueued_at'))
- end
- end
- end
-
- describe '.perform_bulk_in' do
- context 'when delay is valid' do
- it 'correctly schedules background migrations' do
- Sidekiq::Testing.fake! do
- described_class.perform_bulk_in(1.minute, [['Foo', [1]], ['Foo', [2]]])
-
- expect(described_class.jobs.count).to eq 2
- expect(described_class.jobs).to all(include('at'))
- end
- end
- end
-
- context 'when delay is invalid' do
- it 'raises an ArgumentError exception' do
- expect { described_class.perform_bulk_in(-60, [['Foo']]) }
- .to raise_error(ArgumentError)
- end
- end
- end
end
diff --git a/spec/workers/concerns/application_worker_spec.rb b/spec/workers/concerns/application_worker_spec.rb
index fd434f58602..0145563e0ed 100644
--- a/spec/workers/concerns/application_worker_spec.rb
+++ b/spec/workers/concerns/application_worker_spec.rb
@@ -24,4 +24,35 @@ describe ApplicationWorker do
expect(worker.queue).to eq('some_queue')
end
end
+
+ describe '.bulk_perform_async' do
+ it 'enqueues jobs in bulk' do
+ Sidekiq::Testing.fake! do
+ worker.bulk_perform_async([['Foo', [1]], ['Foo', [2]]])
+
+ expect(worker.jobs.count).to eq 2
+ expect(worker.jobs).to all(include('enqueued_at'))
+ end
+ end
+ end
+
+ describe '.bulk_perform_in' do
+ context 'when delay is valid' do
+ it 'correctly schedules jobs' do
+ Sidekiq::Testing.fake! do
+ worker.bulk_perform_in(1.minute, [['Foo', [1]], ['Foo', [2]]])
+
+ expect(worker.jobs.count).to eq 2
+ expect(worker.jobs).to all(include('at'))
+ end
+ end
+ end
+
+ context 'when delay is invalid' do
+ it 'raises an ArgumentError exception' do
+ expect { worker.bulk_perform_in(-60, [['Foo']]) }
+ .to raise_error(ArgumentError)
+ end
+ end
+ end
end