summaryrefslogtreecommitdiff
path: root/app/workers
diff options
context:
space:
mode:
Diffstat (limited to 'app/workers')
-rw-r--r--app/workers/background_migration_worker.rb39
-rw-r--r--app/workers/build_success_worker.rb11
-rw-r--r--app/workers/concerns/new_issuable.rb26
-rw-r--r--app/workers/create_gpg_signature_worker.rb12
-rw-r--r--app/workers/email_receiver_worker.rb2
-rw-r--r--app/workers/expire_build_instance_artifacts_worker.rb2
-rw-r--r--app/workers/expire_job_cache_worker.rb27
-rw-r--r--app/workers/expire_pipeline_cache_worker.rb27
-rw-r--r--app/workers/git_garbage_collect_worker.rb39
-rw-r--r--app/workers/gitlab_shell_worker.rb2
-rw-r--r--app/workers/gitlab_usage_ping_worker.rb18
-rw-r--r--app/workers/invalid_gpg_signature_update_worker.rb12
-rw-r--r--app/workers/irker_worker.rb10
-rw-r--r--app/workers/merge_worker.rb6
-rw-r--r--app/workers/namespaceless_project_destroy_worker.rb39
-rw-r--r--app/workers/new_issue_worker.rb17
-rw-r--r--app/workers/new_merge_request_worker.rb17
-rw-r--r--app/workers/pages_worker.rb2
-rw-r--r--app/workers/pipeline_schedule_worker.rb22
-rw-r--r--app/workers/post_receive.rb46
-rw-r--r--app/workers/process_commit_worker.rb22
-rw-r--r--app/workers/project_cache_worker.rb6
-rw-r--r--app/workers/project_destroy_worker.rb9
-rw-r--r--app/workers/project_service_worker.rb2
-rw-r--r--app/workers/propagate_service_template_worker.rb21
-rw-r--r--app/workers/prune_old_events_worker.rb8
-rw-r--r--app/workers/remove_old_web_hook_logs_worker.rb10
-rw-r--r--app/workers/repository_check/batch_worker.rb8
-rw-r--r--app/workers/repository_check/clear_worker.rb2
-rw-r--r--app/workers/repository_check/single_repository_worker.rb2
-rw-r--r--app/workers/repository_fork_worker.rb50
-rw-r--r--app/workers/repository_import_worker.rb46
-rw-r--r--app/workers/stuck_ci_jobs_worker.rb2
-rw-r--r--app/workers/stuck_import_jobs_worker.rb50
-rw-r--r--app/workers/stuck_merge_jobs_worker.rb34
-rw-r--r--app/workers/system_hook_worker.rb10
-rw-r--r--app/workers/trigger_schedule_worker.rb18
-rw-r--r--app/workers/update_user_activity_worker.rb4
-rw-r--r--app/workers/web_hook_worker.rb (renamed from app/workers/project_web_hook_worker.rb)8
39 files changed, 511 insertions, 177 deletions
diff --git a/app/workers/background_migration_worker.rb b/app/workers/background_migration_worker.rb
new file mode 100644
index 00000000000..45ce49bb5c0
--- /dev/null
+++ b/app/workers/background_migration_worker.rb
@@ -0,0 +1,39 @@
+class BackgroundMigrationWorker
+ include Sidekiq::Worker
+ include DedicatedSidekiqQueue
+
+ # Enqueues a number of jobs in bulk.
+ #
+ # The `jobs` argument should be an Array of Arrays, each sub-array must be in
+ # the form:
+ #
+ # [migration-class, [arg1, arg2, ...]]
+ def self.perform_bulk(jobs)
+ Sidekiq::Client.push_bulk('class' => self,
+ 'queue' => sidekiq_options['queue'],
+ 'args' => jobs)
+ end
+
+ # Schedules multiple jobs in bulk, with a delay.
+ #
+ def self.perform_bulk_in(delay, jobs)
+ now = Time.now.to_i
+ schedule = now + delay.to_i
+
+ if schedule <= now
+ raise ArgumentError, 'The schedule time must be in the future!'
+ end
+
+ Sidekiq::Client.push_bulk('class' => self,
+ 'queue' => sidekiq_options['queue'],
+ 'args' => jobs,
+ 'at' => schedule)
+ end
+
+ # Performs the background migration.
+ #
+ # See Gitlab::BackgroundMigration.perform for more information.
+ def perform(class_name, arguments = [])
+ Gitlab::BackgroundMigration.perform(class_name, arguments)
+ end
+end
diff --git a/app/workers/build_success_worker.rb b/app/workers/build_success_worker.rb
index e17add7421f..bf009dfab0f 100644
--- a/app/workers/build_success_worker.rb
+++ b/app/workers/build_success_worker.rb
@@ -11,15 +11,6 @@ class BuildSuccessWorker
private
def create_deployment(build)
- service = CreateDeploymentService.new(
- build.project, build.user,
- environment: build.environment,
- sha: build.sha,
- ref: build.ref,
- tag: build.tag,
- options: build.options.to_h[:environment],
- variables: build.variables)
-
- service.execute(build)
+ CreateDeploymentService.new(build).execute
end
end
diff --git a/app/workers/concerns/new_issuable.rb b/app/workers/concerns/new_issuable.rb
new file mode 100644
index 00000000000..eb0d6c9c36c
--- /dev/null
+++ b/app/workers/concerns/new_issuable.rb
@@ -0,0 +1,26 @@
+module NewIssuable
+ attr_reader :issuable, :user
+
+ def objects_found?(issuable_id, user_id)
+ set_user(user_id)
+ set_issuable(issuable_id)
+
+ user && issuable
+ end
+
+ def set_user(user_id)
+ @user = User.find_by(id: user_id)
+
+ log_error(User, user_id) unless @user
+ end
+
+ def set_issuable(issuable_id)
+ @issuable = issuable_class.find_by(id: issuable_id)
+
+ log_error(issuable_class, issuable_id) unless @issuable
+ end
+
+ def log_error(record_class, record_id)
+ Rails.logger.error("#{self.class}: couldn't find #{record_class} with ID=#{record_id}, skipping job")
+ end
+end
diff --git a/app/workers/create_gpg_signature_worker.rb b/app/workers/create_gpg_signature_worker.rb
new file mode 100644
index 00000000000..f34dff2d656
--- /dev/null
+++ b/app/workers/create_gpg_signature_worker.rb
@@ -0,0 +1,12 @@
+class CreateGpgSignatureWorker
+ include Sidekiq::Worker
+ include DedicatedSidekiqQueue
+
+ def perform(commit_sha, project_id)
+ project = Project.find_by(id: project_id)
+ return unless project
+
+ # This calculates and caches the signature in the database
+ Gitlab::Gpg::Commit.new(project, commit_sha).signature
+ end
+end
diff --git a/app/workers/email_receiver_worker.rb b/app/workers/email_receiver_worker.rb
index d3f7e479a8d..1afa24c8e2a 100644
--- a/app/workers/email_receiver_worker.rb
+++ b/app/workers/email_receiver_worker.rb
@@ -31,8 +31,6 @@ class EmailReceiverWorker
when Gitlab::Email::EmptyEmailError
can_retry = true
"It appears that the email is blank. Make sure your reply is at the top of the email, we can't process inline replies."
- when Gitlab::Email::AutoGeneratedEmailError
- "The email was marked as 'auto generated', which we can't accept. Please create your comment through the web interface."
when Gitlab::Email::UserNotFoundError
"We couldn't figure out what user corresponds to the email. Please create your comment through the web interface."
when Gitlab::Email::UserBlockedError
diff --git a/app/workers/expire_build_instance_artifacts_worker.rb b/app/workers/expire_build_instance_artifacts_worker.rb
index eb403c134d1..7b59e976492 100644
--- a/app/workers/expire_build_instance_artifacts_worker.rb
+++ b/app/workers/expire_build_instance_artifacts_worker.rb
@@ -8,7 +8,7 @@ class ExpireBuildInstanceArtifactsWorker
.reorder(nil)
.find_by(id: build_id)
- return unless build.try(:project)
+ return unless build&.project && !build.project.pending_delete
Rails.logger.info "Removing artifacts for build #{build.id}..."
build.erase_artifacts!
diff --git a/app/workers/expire_job_cache_worker.rb b/app/workers/expire_job_cache_worker.rb
new file mode 100644
index 00000000000..e383202260d
--- /dev/null
+++ b/app/workers/expire_job_cache_worker.rb
@@ -0,0 +1,27 @@
+class ExpireJobCacheWorker
+ include Sidekiq::Worker
+ include BuildQueue
+
+ def perform(job_id)
+ job = CommitStatus.joins(:pipeline, :project).find_by(id: job_id)
+ return unless job
+
+ pipeline = job.pipeline
+ project = job.project
+
+ Gitlab::EtagCaching::Store.new.tap do |store|
+ store.touch(project_pipeline_path(project, pipeline))
+ store.touch(project_job_path(project, job))
+ end
+ end
+
+ private
+
+ def project_pipeline_path(project, pipeline)
+ Gitlab::Routing.url_helpers.project_pipeline_path(project, pipeline, format: :json)
+ end
+
+ def project_job_path(project, job)
+ Gitlab::Routing.url_helpers.project_build_path(project, job.id, format: :json)
+ end
+end
diff --git a/app/workers/expire_pipeline_cache_worker.rb b/app/workers/expire_pipeline_cache_worker.rb
index 603e2f1aaea..7c02d6cf892 100644
--- a/app/workers/expire_pipeline_cache_worker.rb
+++ b/app/workers/expire_pipeline_cache_worker.rb
@@ -10,6 +10,7 @@ class ExpirePipelineCacheWorker
store = Gitlab::EtagCaching::Store.new
store.touch(project_pipelines_path(project))
+ store.touch(project_pipeline_path(project, pipeline))
store.touch(commit_pipelines_path(project, pipeline.commit)) if pipeline.commit
store.touch(new_merge_request_pipelines_path(project))
each_pipelines_merge_request_path(project, pipeline) do |path|
@@ -22,34 +23,24 @@ class ExpirePipelineCacheWorker
private
def project_pipelines_path(project)
- Gitlab::Routing.url_helpers.namespace_project_pipelines_path(
- project.namespace,
- project,
- format: :json)
+ Gitlab::Routing.url_helpers.project_pipelines_path(project, format: :json)
+ end
+
+ def project_pipeline_path(project, pipeline)
+ Gitlab::Routing.url_helpers.project_pipeline_path(project, pipeline, format: :json)
end
def commit_pipelines_path(project, commit)
- Gitlab::Routing.url_helpers.pipelines_namespace_project_commit_path(
- project.namespace,
- project,
- commit.id,
- format: :json)
+ Gitlab::Routing.url_helpers.pipelines_project_commit_path(project, commit.id, format: :json)
end
def new_merge_request_pipelines_path(project)
- Gitlab::Routing.url_helpers.new_namespace_project_merge_request_path(
- project.namespace,
- project,
- format: :json)
+ Gitlab::Routing.url_helpers.project_new_merge_request_path(project, format: :json)
end
def each_pipelines_merge_request_path(project, pipeline)
pipeline.all_merge_requests.each do |merge_request|
- path = Gitlab::Routing.url_helpers.pipelines_namespace_project_merge_request_path(
- project.namespace,
- project,
- merge_request,
- format: :json)
+ path = Gitlab::Routing.url_helpers.pipelines_project_merge_request_path(project, merge_request, format: :json)
yield(path)
end
diff --git a/app/workers/git_garbage_collect_worker.rb b/app/workers/git_garbage_collect_worker.rb
index d369b639ae9..c95497dfaba 100644
--- a/app/workers/git_garbage_collect_worker.rb
+++ b/app/workers/git_garbage_collect_worker.rb
@@ -5,6 +5,12 @@ class GitGarbageCollectWorker
sidekiq_options retry: false
+ GITALY_MIGRATED_TASKS = {
+ gc: :garbage_collect,
+ full_repack: :repack_full,
+ incremental_repack: :repack_incremental
+ }.freeze
+
def perform(project_id, task = :gc, lease_key = nil, lease_uuid = nil)
project = Project.find(project_id)
task = task.to_sym
@@ -15,8 +21,14 @@ class GitGarbageCollectWorker
Gitlab::GitLogger.info(description)
- output, status = Gitlab::Popen.popen(cmd, repo_path)
- Gitlab::GitLogger.error("#{description} failed:\n#{output}") unless status.zero?
+ gitaly_migrate(GITALY_MIGRATED_TASKS[task]) do |is_enabled|
+ if is_enabled
+ gitaly_call(task, project.repository.raw_repository)
+ else
+ output, status = Gitlab::Popen.popen(cmd, repo_path)
+ Gitlab::GitLogger.error("#{description} failed:\n#{output}") unless status.zero?
+ end
+ end
# Refresh the branch cache in case garbage collection caused a ref lookup to fail
flush_ref_caches(project) if task == :gc
@@ -26,6 +38,19 @@ class GitGarbageCollectWorker
private
+ ## `repository` has to be a Gitlab::Git::Repository
+ def gitaly_call(task, repository)
+ client = Gitlab::GitalyClient::RepositoryService.new(repository)
+ case task
+ when :gc
+ client.garbage_collect(bitmaps_enabled?)
+ when :full_repack
+ client.repack_full(bitmaps_enabled?)
+ when :incremental_repack
+ client.repack_incremental
+ end
+ end
+
def command(task)
case task
when :gc
@@ -55,4 +80,14 @@ class GitGarbageCollectWorker
config_value = write_bitmaps ? 'true' : 'false'
%W[git -c repack.writeBitmaps=#{config_value}]
end
+
+ def gitaly_migrate(method, &block)
+ Gitlab::GitalyClient.migrate(method, &block)
+ rescue GRPC::NotFound => e
+ Gitlab::GitLogger.error("#{method} failed:\nRepository not found")
+ raise Gitlab::Git::Repository::NoRepository.new(e)
+ rescue GRPC::BadStatus => e
+ Gitlab::GitLogger.error("#{method} failed:\n#{e}")
+ raise Gitlab::Git::CommandError.new(e)
+ end
end
diff --git a/app/workers/gitlab_shell_worker.rb b/app/workers/gitlab_shell_worker.rb
index 964287a1793..0ec871e00e1 100644
--- a/app/workers/gitlab_shell_worker.rb
+++ b/app/workers/gitlab_shell_worker.rb
@@ -4,6 +4,6 @@ class GitlabShellWorker
include DedicatedSidekiqQueue
def perform(action, *arg)
- gitlab_shell.send(action, *arg)
+ gitlab_shell.__send__(action, *arg) # rubocop:disable GitlabSecurity/PublicSend
end
end
diff --git a/app/workers/gitlab_usage_ping_worker.rb b/app/workers/gitlab_usage_ping_worker.rb
index 2f02235b0ac..0a55aab63fd 100644
--- a/app/workers/gitlab_usage_ping_worker.rb
+++ b/app/workers/gitlab_usage_ping_worker.rb
@@ -3,29 +3,17 @@ class GitlabUsagePingWorker
include Sidekiq::Worker
include CronjobQueue
- include HTTParty
def perform
- return unless current_application_settings.usage_ping_enabled
-
# Multiple Sidekiq workers could run this. We should only do this at most once a day.
return unless try_obtain_lease
- begin
- HTTParty.post(url,
- body: Gitlab::UsageData.to_json(force_refresh: true),
- headers: { 'Content-type' => 'application/json' }
- )
- rescue HTTParty::Error => e
- Rails.logger.info "Unable to contact GitLab, Inc.: #{e}"
- end
+ SubmitUsagePingService.new.execute
end
+ private
+
def try_obtain_lease
Gitlab::ExclusiveLease.new('gitlab_usage_ping_worker:ping', timeout: LEASE_TIMEOUT).try_obtain
end
-
- def url
- 'https://version.gitlab.com/usage_data'
- end
end
diff --git a/app/workers/invalid_gpg_signature_update_worker.rb b/app/workers/invalid_gpg_signature_update_worker.rb
new file mode 100644
index 00000000000..db6b1ea8e8d
--- /dev/null
+++ b/app/workers/invalid_gpg_signature_update_worker.rb
@@ -0,0 +1,12 @@
+class InvalidGpgSignatureUpdateWorker
+ include Sidekiq::Worker
+ include DedicatedSidekiqQueue
+
+ def perform(gpg_key_id)
+ gpg_key = GpgKey.find_by(id: gpg_key_id)
+
+ return unless gpg_key
+
+ Gitlab::Gpg::InvalidGpgSignatureUpdater.new(gpg_key).run
+ end
+end
diff --git a/app/workers/irker_worker.rb b/app/workers/irker_worker.rb
index c9658b3fe17..3dd14466994 100644
--- a/app/workers/irker_worker.rb
+++ b/app/workers/irker_worker.rb
@@ -66,7 +66,7 @@ class IrkerWorker
end
def send_new_branch(project, repo_name, committer, branch)
- repo_path = project.path_with_namespace
+ repo_path = project.full_path
newbranch = "#{Gitlab.config.gitlab.url}/#{repo_path}/branches"
newbranch = "\x0302\x1f#{newbranch}\x0f" if @colors
@@ -109,7 +109,7 @@ class IrkerWorker
end
def send_commits_count(data, project, repo, committer, branch)
- url = compare_url data, project.path_with_namespace
+ url = compare_url data, project.full_path
commits = colorize_commits data['total_commits_count']
new_commits = 'new commit'
@@ -142,10 +142,10 @@ class IrkerWorker
end
def files_count(commit)
- diffs = commit.raw_diffs(deltas_only: true)
+ diff_size = commit.raw_deltas.size
- files = "#{diffs.real_size} file"
- files += 's' if diffs.size > 1
+ files = "#{diff_size} file"
+ files += 's' if diff_size > 1
files
end
diff --git a/app/workers/merge_worker.rb b/app/workers/merge_worker.rb
index 79efca4f2f9..c3b58df92c1 100644
--- a/app/workers/merge_worker.rb
+++ b/app/workers/merge_worker.rb
@@ -7,7 +7,9 @@ class MergeWorker
current_user = User.find(current_user_id)
merge_request = MergeRequest.find(merge_request_id)
- MergeRequests::MergeService.new(merge_request.target_project, current_user, params).
- execute(merge_request)
+ merge_request.update_column(:merge_jid, jid)
+
+ MergeRequests::MergeService.new(merge_request.target_project, current_user, params)
+ .execute(merge_request)
end
end
diff --git a/app/workers/namespaceless_project_destroy_worker.rb b/app/workers/namespaceless_project_destroy_worker.rb
new file mode 100644
index 00000000000..a9073742ff7
--- /dev/null
+++ b/app/workers/namespaceless_project_destroy_worker.rb
@@ -0,0 +1,39 @@
+# Worker to destroy projects that do not have a namespace
+#
+# It destroys everything it can without having the info about the namespace it
+# used to belong to. Projects in this state should be rare.
+# The worker will reject doing anything for projects that *do* have a
+# namespace. For those use ProjectDestroyWorker instead.
+class NamespacelessProjectDestroyWorker
+ include Sidekiq::Worker
+ include DedicatedSidekiqQueue
+
+ def self.bulk_perform_async(args_list)
+ Sidekiq::Client.push_bulk('class' => self, 'queue' => sidekiq_options['queue'], 'args' => args_list)
+ end
+
+ def perform(project_id)
+ begin
+ project = Project.unscoped.find(project_id)
+ rescue ActiveRecord::RecordNotFound
+ return
+ end
+ return unless project.namespace_id.nil? # Reject doing anything for projects that *do* have a namespace
+
+ project.team.truncate
+
+ unlink_fork(project) if project.forked?
+
+ project.destroy!
+ end
+
+ private
+
+ def unlink_fork(project)
+ merge_requests = project.forked_from_project.merge_requests.opened.from_project(project)
+
+ merge_requests.update_all(state: 'closed')
+
+ project.forked_project_link.destroy
+ end
+end
diff --git a/app/workers/new_issue_worker.rb b/app/workers/new_issue_worker.rb
new file mode 100644
index 00000000000..d9a8e892e90
--- /dev/null
+++ b/app/workers/new_issue_worker.rb
@@ -0,0 +1,17 @@
+class NewIssueWorker
+ include Sidekiq::Worker
+ include DedicatedSidekiqQueue
+ include NewIssuable
+
+ def perform(issue_id, user_id)
+ return unless objects_found?(issue_id, user_id)
+
+ EventCreateService.new.open_issue(issuable, user)
+ NotificationService.new.new_issue(issuable, user)
+ issuable.create_cross_references!(user)
+ end
+
+ def issuable_class
+ Issue
+ end
+end
diff --git a/app/workers/new_merge_request_worker.rb b/app/workers/new_merge_request_worker.rb
new file mode 100644
index 00000000000..1910c490159
--- /dev/null
+++ b/app/workers/new_merge_request_worker.rb
@@ -0,0 +1,17 @@
+class NewMergeRequestWorker
+ include Sidekiq::Worker
+ include DedicatedSidekiqQueue
+ include NewIssuable
+
+ def perform(merge_request_id, user_id)
+ return unless objects_found?(merge_request_id, user_id)
+
+ EventCreateService.new.open_mr(issuable, user)
+ NotificationService.new.new_merge_request(issuable, user)
+ issuable.create_cross_references!(user)
+ end
+
+ def issuable_class
+ MergeRequest
+ end
+end
diff --git a/app/workers/pages_worker.rb b/app/workers/pages_worker.rb
index 4eeb9666bb0..64788da7299 100644
--- a/app/workers/pages_worker.rb
+++ b/app/workers/pages_worker.rb
@@ -4,7 +4,7 @@ class PagesWorker
sidekiq_options queue: :pages, retry: false
def perform(action, *arg)
- send(action, *arg)
+ send(action, *arg) # rubocop:disable GitlabSecurity/PublicSend
end
def deploy(build_id)
diff --git a/app/workers/pipeline_schedule_worker.rb b/app/workers/pipeline_schedule_worker.rb
new file mode 100644
index 00000000000..d7087f20dfc
--- /dev/null
+++ b/app/workers/pipeline_schedule_worker.rb
@@ -0,0 +1,22 @@
+class PipelineScheduleWorker
+ include Sidekiq::Worker
+ include CronjobQueue
+
+ def perform
+ Ci::PipelineSchedule.active.where("next_run_at < ?", Time.now)
+ .preload(:owner, :project).find_each do |schedule|
+ begin
+ pipeline = Ci::CreatePipelineService.new(schedule.project,
+ schedule.owner,
+ ref: schedule.ref)
+ .execute(:schedule, save_on_errors: false, schedule: schedule)
+
+ schedule.deactivate! unless pipeline.persisted?
+ rescue => e
+ Rails.logger.error "#{schedule.id}: Failed to create a scheduled pipeline: #{e.message}"
+ ensure
+ schedule.schedule_next_run!
+ end
+ end
+ end
+end
diff --git a/app/workers/post_receive.rb b/app/workers/post_receive.rb
index 015a41b6e82..b8f8d3750d9 100644
--- a/app/workers/post_receive.rb
+++ b/app/workers/post_receive.rb
@@ -2,34 +2,34 @@ class PostReceive
include Sidekiq::Worker
include DedicatedSidekiqQueue
- def perform(repo_path, identifier, changes)
- repo_relative_path = Gitlab::RepoPath.strip_storage_path(repo_path)
+ def perform(gl_repository, identifier, changes)
+ project, is_wiki = Gitlab::GlRepository.parse(gl_repository)
+
+ if project.nil?
+ log("Triggered hook for non-existing project with gl_repository \"#{gl_repository}\"")
+ return false
+ end
changes = Base64.decode64(changes) unless changes.include?(' ')
# Use Sidekiq.logger so arguments can be correlated with execution
# time and thread ID's.
Sidekiq.logger.info "changes: #{changes.inspect}" if ENV['SIDEKIQ_LOG_ARGUMENTS']
- post_received = Gitlab::GitPostReceive.new(repo_relative_path, identifier, changes)
-
- if post_received.project.nil?
- log("Triggered hook for non-existing project with full path \"#{repo_relative_path}\"")
- return false
- end
+ post_received = Gitlab::GitPostReceive.new(project, identifier, changes)
- if post_received.wiki?
- # Nothing defined here yet.
- elsif post_received.regular_project?
- process_project_changes(post_received)
+ if is_wiki
+ process_wiki_changes(post_received)
else
- log("Triggered hook for unidentifiable repository type with full path \"#{repo_relative_path}\"")
- false
+ process_project_changes(post_received)
end
end
+ private
+
def process_project_changes(post_received)
- post_received.changes.each do |change|
- oldrev, newrev, ref = change.strip.split(' ')
+ changes = []
+ refs = Set.new
+ post_received.changes_refs do |oldrev, newrev, ref|
@user ||= post_received.identify(newrev)
unless @user
@@ -42,10 +42,22 @@ class PostReceive
elsif Gitlab::Git.branch_ref?(ref)
GitPushService.new(post_received.project, @user, oldrev: oldrev, newrev: newrev, ref: ref).execute
end
+
+ changes << Gitlab::DataBuilder::Repository.single_change(oldrev, newrev, ref)
+ refs << ref
end
+
+ after_project_changes_hooks(post_received, @user, refs.to_a, changes)
end
- private
+ def after_project_changes_hooks(post_received, user, refs, changes)
+ hook_data = Gitlab::DataBuilder::Repository.update(post_received.project, user, changes, refs)
+ SystemHooksService.new.execute_hooks(hook_data, :repository_update_hooks)
+ end
+
+ def process_wiki_changes(post_received)
+ # Nothing defined here yet.
+ end
def log(message)
Gitlab::GitLogger.error("POST-RECEIVE: #{message}")
diff --git a/app/workers/process_commit_worker.rb b/app/workers/process_commit_worker.rb
index 2f7967cf531..c0c03848a40 100644
--- a/app/workers/process_commit_worker.rb
+++ b/app/workers/process_commit_worker.rb
@@ -17,12 +17,14 @@ class ProcessCommitWorker
project = Project.find_by(id: project_id)
return unless project
+ return if commit_exists_in_upstream?(project, commit_hash)
user = User.find_by(id: user_id)
return unless user
commit = build_commit(project, commit_hash)
+
author = commit.author || user
process_commit_message(project, commit, user, author, default)
@@ -45,8 +47,8 @@ class ProcessCommitWorker
# therefor we use IssueCollection here and skip the authorization check in
# Issues::CloseService#execute.
IssueCollection.new(issues).updatable_by_user(user).each do |issue|
- Issues::CloseService.new(project, author).
- close_issue(issue, commit: commit)
+ Issues::CloseService.new(project, author)
+ .close_issue(issue, commit: commit)
end
end
@@ -55,8 +57,8 @@ class ProcessCommitWorker
return if mentioned_issues.empty?
- Issue::Metrics.where(issue_id: mentioned_issues.map(&:id), first_mentioned_in_commit_at: nil).
- update_all(first_mentioned_in_commit_at: commit.committed_date)
+ Issue::Metrics.where(issue_id: mentioned_issues.map(&:id), first_mentioned_in_commit_at: nil)
+ .update_all(first_mentioned_in_commit_at: commit.committed_date)
end
def build_commit(project, hash)
@@ -73,4 +75,16 @@ class ProcessCommitWorker
Commit.from_hash(hash, project)
end
+
+ private
+
+ # Avoid reprocessing commits that already exist in the upstream
+ # when project is forked. This will also prevent duplicated system notes.
+ def commit_exists_in_upstream?(project, commit_hash)
+ return false unless project.forked?
+
+ upstream_project = project.forked_from_project
+ commit_id = commit_hash.with_indifferent_access[:id]
+ upstream_project.commit(commit_id).present?
+ end
end
diff --git a/app/workers/project_cache_worker.rb b/app/workers/project_cache_worker.rb
index 8ff9d07860f..505ff9e086e 100644
--- a/app/workers/project_cache_worker.rb
+++ b/app/workers/project_cache_worker.rb
@@ -32,8 +32,8 @@ class ProjectCacheWorker
private
def try_obtain_lease_for(project_id, section)
- Gitlab::ExclusiveLease.
- new("project_cache_worker:#{project_id}:#{section}", timeout: LEASE_TIMEOUT).
- try_obtain
+ Gitlab::ExclusiveLease
+ .new("project_cache_worker:#{project_id}:#{section}", timeout: LEASE_TIMEOUT)
+ .try_obtain
end
end
diff --git a/app/workers/project_destroy_worker.rb b/app/workers/project_destroy_worker.rb
index b462327490e..a9188b78460 100644
--- a/app/workers/project_destroy_worker.rb
+++ b/app/workers/project_destroy_worker.rb
@@ -3,14 +3,11 @@ class ProjectDestroyWorker
include DedicatedSidekiqQueue
def perform(project_id, user_id, params)
- begin
- project = Project.unscoped.find(project_id)
- rescue ActiveRecord::RecordNotFound
- return
- end
-
+ project = Project.find(project_id)
user = User.find(user_id)
::Projects::DestroyService.new(project, user, params.symbolize_keys).execute
+ rescue ActiveRecord::RecordNotFound => error
+ logger.error("Failed to delete project (#{project_id}): #{error.message}")
end
end
diff --git a/app/workers/project_service_worker.rb b/app/workers/project_service_worker.rb
index fdfdeab7b41..4883d848c53 100644
--- a/app/workers/project_service_worker.rb
+++ b/app/workers/project_service_worker.rb
@@ -2,6 +2,8 @@ class ProjectServiceWorker
include Sidekiq::Worker
include DedicatedSidekiqQueue
+ sidekiq_options dead: false
+
def perform(hook_id, data)
data = data.with_indifferent_access
Service.find(hook_id).execute(data)
diff --git a/app/workers/propagate_service_template_worker.rb b/app/workers/propagate_service_template_worker.rb
new file mode 100644
index 00000000000..6b607451c7a
--- /dev/null
+++ b/app/workers/propagate_service_template_worker.rb
@@ -0,0 +1,21 @@
+# Worker for updating any project specific caches.
+class PropagateServiceTemplateWorker
+ include Sidekiq::Worker
+ include DedicatedSidekiqQueue
+
+ LEASE_TIMEOUT = 4.hours.to_i
+
+ def perform(template_id)
+ return unless try_obtain_lease_for(template_id)
+
+ Projects::PropagateServiceTemplate.propagate(Service.find_by(id: template_id))
+ end
+
+ private
+
+ def try_obtain_lease_for(template_id)
+ Gitlab::ExclusiveLease
+ .new("propagate_service_template_worker:#{template_id}", timeout: LEASE_TIMEOUT)
+ .try_obtain
+ end
+end
diff --git a/app/workers/prune_old_events_worker.rb b/app/workers/prune_old_events_worker.rb
index 392abb9c21b..2b43bb19ad1 100644
--- a/app/workers/prune_old_events_worker.rb
+++ b/app/workers/prune_old_events_worker.rb
@@ -10,9 +10,9 @@ class PruneOldEventsWorker
'(id IN (SELECT id FROM (?) ids_to_remove))',
Event.unscoped.where(
'created_at < ?',
- (12.months + 1.day).ago).
- select(:id).
- limit(10_000)).
- delete_all
+ (12.months + 1.day).ago)
+ .select(:id)
+ .limit(10_000))
+ .delete_all
end
end
diff --git a/app/workers/remove_old_web_hook_logs_worker.rb b/app/workers/remove_old_web_hook_logs_worker.rb
new file mode 100644
index 00000000000..555e1bb8691
--- /dev/null
+++ b/app/workers/remove_old_web_hook_logs_worker.rb
@@ -0,0 +1,10 @@
+class RemoveOldWebHookLogsWorker
+ include Sidekiq::Worker
+ include CronjobQueue
+
+ WEB_HOOK_LOG_LIFETIME = 2.days
+
+ def perform
+ WebHookLog.destroy_all(['created_at < ?', Time.now - WEB_HOOK_LOG_LIFETIME])
+ end
+end
diff --git a/app/workers/repository_check/batch_worker.rb b/app/workers/repository_check/batch_worker.rb
index c3e7491ec4e..b94d83bd709 100644
--- a/app/workers/repository_check/batch_worker.rb
+++ b/app/workers/repository_check/batch_worker.rb
@@ -32,10 +32,10 @@ module RepositoryCheck
# has to sit and wait for this query to finish.
def project_ids
limit = 10_000
- never_checked_projects = Project.where('last_repository_check_at IS NULL AND created_at < ?', 24.hours.ago).
- limit(limit).pluck(:id)
- old_check_projects = Project.where('last_repository_check_at < ?', 1.month.ago).
- reorder('last_repository_check_at ASC').limit(limit).pluck(:id)
+ never_checked_projects = Project.where('last_repository_check_at IS NULL AND created_at < ?', 24.hours.ago)
+ .limit(limit).pluck(:id)
+ old_check_projects = Project.where('last_repository_check_at < ?', 1.month.ago)
+ .reorder('last_repository_check_at ASC').limit(limit).pluck(:id)
never_checked_projects + old_check_projects
end
diff --git a/app/workers/repository_check/clear_worker.rb b/app/workers/repository_check/clear_worker.rb
index 1f1b38540ee..85bc9103538 100644
--- a/app/workers/repository_check/clear_worker.rb
+++ b/app/workers/repository_check/clear_worker.rb
@@ -8,7 +8,7 @@ module RepositoryCheck
Project.select(:id).find_in_batches(batch_size: 100) do |batch|
Project.where(id: batch.map(&:id)).update_all(
last_repository_check_failed: nil,
- last_repository_check_at: nil,
+ last_repository_check_at: nil
)
end
end
diff --git a/app/workers/repository_check/single_repository_worker.rb b/app/workers/repository_check/single_repository_worker.rb
index 3d8bfc6fc6c..164586cf0b7 100644
--- a/app/workers/repository_check/single_repository_worker.rb
+++ b/app/workers/repository_check/single_repository_worker.rb
@@ -7,7 +7,7 @@ module RepositoryCheck
project = Project.find(project_id)
project.update_columns(
last_repository_check_failed: !check(project),
- last_repository_check_at: Time.now,
+ last_repository_check_at: Time.now
)
end
diff --git a/app/workers/repository_fork_worker.rb b/app/workers/repository_fork_worker.rb
index efc99ec962a..cde5b45ad41 100644
--- a/app/workers/repository_fork_worker.rb
+++ b/app/workers/repository_fork_worker.rb
@@ -1,36 +1,50 @@
class RepositoryForkWorker
+ ForkError = Class.new(StandardError)
+
include Sidekiq::Worker
include Gitlab::ShellAdapter
include DedicatedSidekiqQueue
+ sidekiq_options status_expiration: StuckImportJobsWorker::IMPORT_JOBS_EXPIRATION
+
def perform(project_id, forked_from_repository_storage_path, source_path, target_path)
+ project = Project.find(project_id)
+
+ return unless start_fork(project)
+
Gitlab::Metrics.add_event(:fork_repository,
source_path: source_path,
target_path: target_path)
- project = Project.find_by_id(project_id)
-
- unless project.present?
- logger.error("Project #{project_id} no longer exists!")
- return
- end
-
result = gitlab_shell.fork_repository(forked_from_repository_storage_path, source_path,
project.repository_storage_path, target_path)
- unless result
- logger.error("Unable to fork project #{project_id} for repository #{source_path} -> #{target_path}")
- project.mark_import_as_failed('The project could not be forked.')
- return
- end
+ raise ForkError, "Unable to fork project #{project_id} for repository #{source_path} -> #{target_path}" unless result
project.repository.after_import
-
- unless project.valid_repo?
- logger.error("Project #{project_id} had an invalid repository after fork")
- project.mark_import_as_failed('The forked repository is invalid.')
- return
- end
+ raise ForkError, "Project #{project_id} had an invalid repository after fork" unless project.valid_repo?
project.import_finish
+ rescue ForkError => ex
+ fail_fork(project, ex.message)
+ raise
+ rescue => ex
+ return unless project
+
+ fail_fork(project, ex.message)
+ raise ForkError, "#{ex.class} #{ex.message}"
+ end
+
+ private
+
+ def start_fork(project)
+ return true if project.import_start
+
+ Rails.logger.info("Project #{project.full_path} was in inconsistent state (#{project.import_status}) while forking.")
+ false
+ end
+
+ def fail_fork(project, message)
+ Rails.logger.error(message)
+ project.mark_import_as_failed(message)
end
end
diff --git a/app/workers/repository_import_worker.rb b/app/workers/repository_import_worker.rb
index b33ba2ed7c1..2c2d1e8b91f 100644
--- a/app/workers/repository_import_worker.rb
+++ b/app/workers/repository_import_worker.rb
@@ -1,29 +1,45 @@
class RepositoryImportWorker
+ ImportError = Class.new(StandardError)
+
include Sidekiq::Worker
include DedicatedSidekiqQueue
- sidekiq_options status_expiration: StuckImportJobsWorker::IMPORT_EXPIRATION
-
- attr_accessor :project, :current_user
+ sidekiq_options status_expiration: StuckImportJobsWorker::IMPORT_JOBS_EXPIRATION
def perform(project_id)
- @project = Project.find(project_id)
- @current_user = @project.creator
-
- Gitlab::Metrics.add_event(:import_repository,
- import_url: @project.import_url,
- path: @project.path_with_namespace)
+ project = Project.find(project_id)
- project.update_columns(import_jid: self.jid, import_error: nil)
+ return unless start_import(project)
- result = Projects::ImportService.new(project, current_user).execute
+ Gitlab::Metrics.add_event(:import_repository,
+ import_url: project.import_url,
+ path: project.full_path)
- if result[:status] == :error
- project.mark_import_as_failed(result[:message])
- return
- end
+ result = Projects::ImportService.new(project, project.creator).execute
+ raise ImportError, result[:message] if result[:status] == :error
project.repository.after_import
project.import_finish
+ rescue ImportError => ex
+ fail_import(project, ex.message)
+ raise
+ rescue => ex
+ return unless project
+
+ fail_import(project, ex.message)
+ raise ImportError, "#{ex.class} #{ex.message}"
+ end
+
+ private
+
+ def start_import(project)
+ return true if project.import_start
+
+ Rails.logger.info("Project #{project.full_path} was in inconsistent state (#{project.import_status}) while importing.")
+ false
+ end
+
+ def fail_import(project, message)
+ project.mark_import_as_failed(message)
end
end
diff --git a/app/workers/stuck_ci_jobs_worker.rb b/app/workers/stuck_ci_jobs_worker.rb
index ae8c980c9e4..8b0cfcc8af8 100644
--- a/app/workers/stuck_ci_jobs_worker.rb
+++ b/app/workers/stuck_ci_jobs_worker.rb
@@ -45,7 +45,7 @@ class StuckCiJobsWorker
def search(status, timeout)
builds = Ci::Build.where(status: status).where('ci_builds.updated_at < ?', timeout.ago)
- builds.joins(:project).includes(:tags, :runner, project: :namespace).find_each(batch_size: 50).each do |build|
+ builds.joins(:project).merge(Project.without_deleted).includes(:tags, :runner, project: :namespace).find_each(batch_size: 50).each do |build|
yield(build)
end
end
diff --git a/app/workers/stuck_import_jobs_worker.rb b/app/workers/stuck_import_jobs_worker.rb
index bfc5e667bb6..f850e459cd9 100644
--- a/app/workers/stuck_import_jobs_worker.rb
+++ b/app/workers/stuck_import_jobs_worker.rb
@@ -2,36 +2,60 @@ class StuckImportJobsWorker
include Sidekiq::Worker
include CronjobQueue
- IMPORT_EXPIRATION = 15.hours.to_i
+ IMPORT_JOBS_EXPIRATION = 15.hours.to_i
def perform
- stuck_projects.find_in_batches(batch_size: 500) do |group|
+ projects_without_jid_count = mark_projects_without_jid_as_failed!
+ projects_with_jid_count = mark_projects_with_jid_as_failed!
+
+ Gitlab::Metrics.add_event(:stuck_import_jobs,
+ projects_without_jid_count: projects_without_jid_count,
+ projects_with_jid_count: projects_with_jid_count)
+ end
+
+ private
+
+ def mark_projects_without_jid_as_failed!
+ started_projects_without_jid.each do |project|
+ project.mark_import_as_failed(error_message)
+ end.count
+ end
+
+ def mark_projects_with_jid_as_failed!
+ completed_jids_count = 0
+
+ started_projects_with_jid.find_in_batches(batch_size: 500) do |group|
jids = group.map(&:import_jid)
# Find the jobs that aren't currently running or that exceeded the threshold.
- completed_jids = Gitlab::SidekiqStatus.completed_jids(jids)
+ completed_jids = Gitlab::SidekiqStatus.completed_jids(jids).to_set
if completed_jids.any?
- completed_ids = group.select { |project| completed_jids.include?(project.import_jid) }.map(&:id)
+ completed_jids_count += completed_jids.count
+ group.each do |project|
+ project.mark_import_as_failed(error_message) if completed_jids.include?(project.import_jid)
+ end
- fail_batch!(completed_jids, completed_ids)
+ Rails.logger.info("Marked stuck import jobs as failed. JIDs: #{completed_jids.to_a.join(', ')}")
end
end
- end
- private
+ completed_jids_count
+ end
- def stuck_projects
- Project.select('id, import_jid').with_import_status(:started).where.not(import_jid: nil)
+ def started_projects
+ Project.with_import_status(:started)
end
- def fail_batch!(completed_jids, completed_ids)
- Project.where(id: completed_ids).update_all(import_status: 'failed', import_error: error_message)
+ def started_projects_with_jid
+ started_projects.where.not(import_jid: nil)
+ end
- Rails.logger.info("Marked stuck import jobs as failed. JIDs: #{completed_jids.join(', ')}")
+ def started_projects_without_jid
+ started_projects.where(import_jid: nil)
end
def error_message
- "Import timed out. Import took longer than #{IMPORT_EXPIRATION} seconds"
+ "Import timed out. Import took longer than #{IMPORT_JOBS_EXPIRATION} seconds"
end
end
diff --git a/app/workers/stuck_merge_jobs_worker.rb b/app/workers/stuck_merge_jobs_worker.rb
new file mode 100644
index 00000000000..7843179d77c
--- /dev/null
+++ b/app/workers/stuck_merge_jobs_worker.rb
@@ -0,0 +1,34 @@
+class StuckMergeJobsWorker
+ include Sidekiq::Worker
+ include CronjobQueue
+
+ def perform
+ stuck_merge_requests.find_in_batches(batch_size: 100) do |group|
+ jids = group.map(&:merge_jid)
+
+ # Find the jobs that aren't currently running or that exceeded the threshold.
+ completed_jids = Gitlab::SidekiqStatus.completed_jids(jids)
+
+ if completed_jids.any?
+ completed_ids = group.select { |merge_request| completed_jids.include?(merge_request.merge_jid) }.map(&:id)
+
+ apply_current_state!(completed_jids, completed_ids)
+ end
+ end
+ end
+
+ private
+
+ def apply_current_state!(completed_jids, completed_ids)
+ merge_requests = MergeRequest.where(id: completed_ids)
+
+ merge_requests.where.not(merge_commit_sha: nil).update_all(state: :merged)
+ merge_requests.where(merge_commit_sha: nil).update_all(state: :opened)
+
+ Rails.logger.info("Updated state of locked merge jobs. JIDs: #{completed_jids.join(', ')}")
+ end
+
+ def stuck_merge_requests
+ MergeRequest.select('id, merge_jid').with_state(:locked).where.not(merge_jid: nil).reorder(nil)
+ end
+end
diff --git a/app/workers/system_hook_worker.rb b/app/workers/system_hook_worker.rb
deleted file mode 100644
index 55d4e7d6dab..00000000000
--- a/app/workers/system_hook_worker.rb
+++ /dev/null
@@ -1,10 +0,0 @@
-class SystemHookWorker
- include Sidekiq::Worker
- include DedicatedSidekiqQueue
-
- sidekiq_options retry: 4
-
- def perform(hook_id, data, hook_name)
- SystemHook.find(hook_id).execute(data, hook_name)
- end
-end
diff --git a/app/workers/trigger_schedule_worker.rb b/app/workers/trigger_schedule_worker.rb
deleted file mode 100644
index 9c1baf7e6c5..00000000000
--- a/app/workers/trigger_schedule_worker.rb
+++ /dev/null
@@ -1,18 +0,0 @@
-class TriggerScheduleWorker
- include Sidekiq::Worker
- include CronjobQueue
-
- def perform
- Ci::TriggerSchedule.active.where("next_run_at < ?", Time.now).find_each do |trigger_schedule|
- begin
- Ci::CreateTriggerRequestService.new.execute(trigger_schedule.project,
- trigger_schedule.trigger,
- trigger_schedule.ref)
- rescue => e
- Rails.logger.error "#{trigger_schedule.id}: Failed to trigger_schedule job: #{e.message}"
- ensure
- trigger_schedule.schedule_next_run!
- end
- end
- end
-end
diff --git a/app/workers/update_user_activity_worker.rb b/app/workers/update_user_activity_worker.rb
index b3c2f13aa33..31bbdb69edb 100644
--- a/app/workers/update_user_activity_worker.rb
+++ b/app/workers/update_user_activity_worker.rb
@@ -7,8 +7,8 @@ class UpdateUserActivityWorker
ids = pairs.keys
conditions = 'WHEN id = ? THEN ? ' * ids.length
- User.where(id: ids).
- update_all([
+ User.where(id: ids)
+ .update_all([
"last_activity_on = CASE #{conditions} ELSE last_activity_on END",
*pairs.to_a.flatten
])
diff --git a/app/workers/project_web_hook_worker.rb b/app/workers/web_hook_worker.rb
index d973e662ff2..713c0228040 100644
--- a/app/workers/project_web_hook_worker.rb
+++ b/app/workers/web_hook_worker.rb
@@ -1,11 +1,13 @@
-class ProjectWebHookWorker
+class WebHookWorker
include Sidekiq::Worker
include DedicatedSidekiqQueue
- sidekiq_options retry: 4
+ sidekiq_options retry: 4, dead: false
def perform(hook_id, data, hook_name)
+ hook = WebHook.find(hook_id)
data = data.with_indifferent_access
- WebHook.find(hook_id).execute(data, hook_name)
+
+ WebHookService.new(hook, data, hook_name).execute
end
end